Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None:
return None
job = Job.load(env, job_uuid)
assert job and job.state == ENQUEUED
job.set_started()
job.store()
env.cr.commit()
if not job.lock():
_logger.warning(
"was requested to run job %s, but it could not be locked",
job_uuid,
)
return None
job.set_started()
job.store()
env.cr.commit()
return job

@classmethod
Expand Down Expand Up @@ -218,7 +218,10 @@ def runjob(self, db, job_uuid, **kw):
job = self._acquire_job(env, job_uuid)
if not job:
return ""
self._runjob(env, job)
try:
self._runjob(env, job)
finally:
job.unlock()
return ""

# flake8: noqa: C901
Expand Down
65 changes: 23 additions & 42 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
DEFAULT_MAX_RETRIES = 5
RETRY_INTERVAL = 10 * 60 # seconds

PG_ADVISORY_EXECUTION_LOCK_ID = 965655780

_logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -221,56 +223,33 @@ def load_many(cls, env, job_uuids):
recordset = cls.db_records_from_uuids(env, job_uuids)
return {cls._load_from_db_record(record) for record in recordset}

def add_lock_record(self) -> None:
"""
Create row in db to be locked while the job is being performed.
"""
self.env.cr.execute(
"""
INSERT INTO
queue_job_lock (id, queue_job_id)
SELECT
id, id
FROM
queue_job
WHERE
uuid = %s
ON CONFLICT(id)
DO NOTHING;
""",
[self.uuid],
)

def lock(self) -> bool:
"""Lock row of job that is being performed.
"""Lock job that is being performed using a session-level advisory lock.

Return False if a job cannot be locked: it means that the job is not in
STARTED state or is already locked by another worker.
"""
# 2147483647 is the max value for int, which is the max value accepted
# by pg_try_advisory_lock when using two arguments, as the job id might
# be higher than that, we use a modulo to be sure to never exceed the limit.
# A collision is highly unlikely because ids are sequential so a modulo
# should not cause two different jobs to have the same lock id at the
# same time. Even then, they would run one after the other.
self.env.cr.execute(
"""
SELECT
*
FROM
queue_job_lock
WHERE
queue_job_id in (
SELECT
id
FROM
queue_job
WHERE
uuid = %s
AND state = %s
)
FOR NO KEY UPDATE SKIP LOCKED;
""",
[self.uuid, STARTED],
"SELECT pg_try_advisory_lock(%s, "
"(CAST(%s AS bigint) %% 2147483647)::integer)",
(PG_ADVISORY_EXECUTION_LOCK_ID, self._record_id),
)

# 1 job should be locked
return bool(self.env.cr.fetchall())

def unlock(self) -> None:
"""Release the session-level advisory lock."""
self.env.cr.execute(
"SELECT pg_advisory_unlock(%s, "
"(CAST(%s AS bigint) %% 2147483647)::integer)",
(PG_ADVISORY_EXECUTION_LOCK_ID, self._record_id),
)

@classmethod
def _load_from_db_record(cls, job_db_record):
stored = job_db_record
Expand Down Expand Up @@ -298,6 +277,8 @@ def _load_from_db_record(cls, job_db_record):
identity_key=stored.identity_key,
)

job_._record_id = stored.id

if stored.date_created:
job_.date_created = stored.date_created

Expand Down Expand Up @@ -424,6 +405,7 @@ def __init__(

self._uuid = job_uuid
self.graph_uuid = None
self._record_id = None

self.args = args
self.kwargs = kwargs
Expand Down Expand Up @@ -788,7 +770,6 @@ def set_started(self):
self.state = STARTED
self.date_started = datetime.now()
self.worker_pid = os.getpid()
self.add_lock_record()

def set_done(self, result=None):
self.state = DONE
Expand Down
37 changes: 15 additions & 22 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import odoo
from odoo.tools import config

from ..job import PG_ADVISORY_EXECUTION_LOCK_ID
from . import queue_job_config
from .channels import ENQUEUED, NOT_DONE, ChannelManager

Expand Down Expand Up @@ -213,7 +214,8 @@ def set_job_enqueued(self, uuid):
)

def _query_requeue_dead_jobs(self):
return """
return [
"""
UPDATE
queue_job
SET
Expand Down Expand Up @@ -255,27 +257,18 @@ def _query_requeue_dead_jobs(self):
WHERE
state IN ('enqueued','started')
AND date_enqueued < (now() AT TIME ZONE 'utc' - INTERVAL '10 sec')
AND (
id in (
SELECT
queue_job_id
FROM
queue_job_lock
WHERE
queue_job_lock.queue_job_id = queue_job.id
FOR NO KEY UPDATE SKIP LOCKED
)
OR NOT EXISTS (
SELECT
1
FROM
queue_job_lock
WHERE
queue_job_lock.queue_job_id = queue_job.id
)
AND NOT EXISTS (
SELECT 1
FROM pg_locks
WHERE locktype = 'advisory'
AND classid = %s
AND objid = (queue_job.id %% 2147483647)::integer
AND granted = true
)
RETURNING uuid
"""
""",
(PG_ADVISORY_EXECUTION_LOCK_ID,),
]

def requeue_dead_jobs(self):
"""
Expand Down Expand Up @@ -304,9 +297,9 @@ def requeue_dead_jobs(self):
"""

with closing(self.conn.cursor()) as cr:
query = self._query_requeue_dead_jobs()
query, query_args = self._query_requeue_dead_jobs()

cr.execute(query)
cr.execute(query, query_args)

for (uuid,) in cr.fetchall():
_logger.warning("Re-queued dead job with uuid: %s", uuid)
Expand Down
Loading