diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 2c8a745924..b04ea8f031 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -78,15 +78,15 @@ def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: return None job = Job.load(env, job_uuid) assert job and job.state == ENQUEUED - job.set_started() - job.store() - env.cr.commit() if not job.lock(): _logger.warning( "was requested to run job %s, but it could not be locked", job_uuid, ) return None + job.set_started() + job.store() + env.cr.commit() return job @classmethod @@ -218,7 +218,10 @@ def runjob(self, db, job_uuid, **kw): job = self._acquire_job(env, job_uuid) if not job: return "" - self._runjob(env, job) + try: + self._runjob(env, job) + finally: + job.unlock() return "" # flake8: noqa: C901 diff --git a/queue_job/job.py b/queue_job/job.py index 96cd6b6c46..b8cb6b05e4 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -37,6 +37,8 @@ DEFAULT_MAX_RETRIES = 5 RETRY_INTERVAL = 10 * 60 # seconds +PG_ADVISORY_EXECUTION_LOCK_ID = 965655780 + _logger = logging.getLogger(__name__) @@ -221,56 +223,31 @@ def load_many(cls, env, job_uuids): recordset = cls.db_records_from_uuids(env, job_uuids) return {cls._load_from_db_record(record) for record in recordset} - def add_lock_record(self) -> None: - """ - Create row in db to be locked while the job is being performed. - """ - self.env.cr.execute( - """ - INSERT INTO - queue_job_lock (id, queue_job_id) - SELECT - id, id - FROM - queue_job - WHERE - uuid = %s - ON CONFLICT(id) - DO NOTHING; - """, - [self.uuid], - ) - def lock(self) -> bool: - """Lock row of job that is being performed. + """Lock job that is being performed using a session-level advisory lock. Return False if a job cannot be locked: it means that the job is not in STARTED state or is already locked by another worker. """ + # 2147483647 is the max value for int, which is the max value accepted + # by pg_try_advisory_lock when using two arguments, as the job id might + # be higher than that, we use a modulo to be sure to never exceed the limit. + # A collision is highly unlikely because ids are sequential so a modulo + # should not cause two different jobs to have the same lock id at the + # same time. Even then, they would run one after the other. self.env.cr.execute( - """ - SELECT - * - FROM - queue_job_lock - WHERE - queue_job_id in ( - SELECT - id - FROM - queue_job - WHERE - uuid = %s - AND state = %s - ) - FOR NO KEY UPDATE SKIP LOCKED; - """, - [self.uuid, STARTED], + "SELECT pg_try_advisory_lock(%s, (%s %% 2147483647)::integer)", + (PG_ADVISORY_EXECUTION_LOCK_ID, self._record_id), ) - - # 1 job should be locked return bool(self.env.cr.fetchall()) + def unlock(self) -> None: + """Release the session-level advisory lock.""" + self.env.cr.execute( + "SELECT pg_advisory_unlock(%s, (%s %% 2147483647)::integer)", + (PG_ADVISORY_EXECUTION_LOCK_ID, self._record_id), + ) + @classmethod def _load_from_db_record(cls, job_db_record): stored = job_db_record @@ -298,6 +275,8 @@ def _load_from_db_record(cls, job_db_record): identity_key=stored.identity_key, ) + job_._record_id = stored.id + if stored.date_created: job_.date_created = stored.date_created @@ -424,6 +403,7 @@ def __init__( self._uuid = job_uuid self.graph_uuid = None + self._record_id = None self.args = args self.kwargs = kwargs @@ -788,7 +768,6 @@ def set_started(self): self.state = STARTED self.date_started = datetime.now() self.worker_pid = os.getpid() - self.add_lock_record() def set_done(self, result=None): self.state = DONE diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 95e134ba44..d1d6625548 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -33,6 +33,7 @@ import odoo from odoo.tools import config +from ..job import PG_ADVISORY_EXECUTION_LOCK_ID from . import queue_job_config from .channels import ENQUEUED, NOT_DONE, ChannelManager @@ -213,7 +214,8 @@ def set_job_enqueued(self, uuid): ) def _query_requeue_dead_jobs(self): - return """ + return [ + """ UPDATE queue_job SET @@ -255,27 +257,18 @@ def _query_requeue_dead_jobs(self): WHERE state IN ('enqueued','started') AND date_enqueued < (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') - AND ( - id in ( - SELECT - queue_job_id - FROM - queue_job_lock - WHERE - queue_job_lock.queue_job_id = queue_job.id - FOR NO KEY UPDATE SKIP LOCKED - ) - OR NOT EXISTS ( - SELECT - 1 - FROM - queue_job_lock - WHERE - queue_job_lock.queue_job_id = queue_job.id - ) + AND NOT EXISTS ( + SELECT 1 + FROM pg_locks + WHERE locktype = 'advisory' + AND classid = %s + AND objid = (queue_job.id %% 2147483647)::integer + AND granted = true ) RETURNING uuid - """ + """, + (PG_ADVISORY_EXECUTION_LOCK_ID,), + ] def requeue_dead_jobs(self): """ @@ -304,9 +297,9 @@ def requeue_dead_jobs(self): """ with closing(self.conn.cursor()) as cr: - query = self._query_requeue_dead_jobs() + query, query_args = self._query_requeue_dead_jobs() - cr.execute(query) + cr.execute(query, query_args) for (uuid,) in cr.fetchall(): _logger.warning("Re-queued dead job with uuid: %s", uuid)