Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
from werkzeug.exceptions import BadRequest, Forbidden

from odoo import SUPERUSER_ID, _, api, http
from odoo.modules.registry import Registry
from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY
from odoo.tools import config

from ..delay import chain, group
from ..exception import FailedJobError, RetryableJobError
Expand All @@ -38,8 +38,10 @@ def _prevent_commit(cr):
def forbidden_commit(*args, **kwargs):
raise RuntimeError(
"Commit is forbidden in queue jobs. "
"If the current job is a cron running as queue job, "
"modify it to run as a normal cron."
'You may want to enable the "Allow Commit" option on the Job '
"Function. Alternatively, if the current job is a cron running as "
"queue job, you can modify it to run as a normal cron. More details on: "
"https://github.com/OCA/queue/wiki/%5BDRAFT%5D-Upgrade-warning:-commits-inside-jobs"
)

original_commit = cr.commit
Expand Down Expand Up @@ -103,7 +105,8 @@ def _try_perform_job(cls, env, job):
job.set_done()
job.store()
env.flush_all()
env.cr.commit()
if not config["test_enable"]:
env.cr.commit()
_logger.debug("%s done", job)

@classmethod
Expand Down Expand Up @@ -141,8 +144,7 @@ def _enqueue_dependent_jobs(cls, env, job):
def _runjob(cls, env: api.Environment, job: Job) -> None:
def retry_postpone(job, message, seconds=None):
job.env.clear()
with Registry(job.env.cr.dbname).cursor() as new_cr:
job.env = api.Environment(new_cr, SUPERUSER_ID, {})
with job.in_temporary_env():
job.postpone(result=message, seconds=seconds)
job.set_pending(reset_retry=False)
job.store()
Expand All @@ -167,15 +169,15 @@ def retry_postpone(job, message, seconds=None):
# traceback in the logs we should have the traceback when all
# retries are exhausted
env.cr.rollback()
return

except (FailedJobError, Exception) as orig_exception:
buff = StringIO()
traceback.print_exc(file=buff)
traceback_txt = buff.getvalue()
_logger.error(traceback_txt)
job.env.clear()
with Registry(job.env.cr.dbname).cursor() as new_cr:
job.env = job.env(cr=new_cr)
with job.in_temporary_env():
vals = cls._get_failure_values(job, traceback_txt, orig_exception)
job.set_failed(**vals)
job.store()
Expand Down Expand Up @@ -229,6 +231,7 @@ def create_test_job(
failure_rate=0,
job_duration=0,
commit_within_job=False,
failure_retry_seconds=0,
):
if not http.request.env.user.has_group("base.group_erp_manager"):
raise Forbidden(_("Access Denied"))
Expand Down Expand Up @@ -266,6 +269,12 @@ def create_test_job(
except ValueError:
max_retries = None

if failure_retry_seconds is not None:
try:
failure_retry_seconds = int(failure_retry_seconds)
except ValueError:
failure_retry_seconds = 0

if size == 1:
return self._create_single_test_job(
priority=priority,
Expand All @@ -275,6 +284,7 @@ def create_test_job(
failure_rate=failure_rate,
job_duration=job_duration,
commit_within_job=commit_within_job,
failure_retry_seconds=failure_retry_seconds,
)

if size > 1:
Expand All @@ -287,6 +297,7 @@ def create_test_job(
failure_rate=failure_rate,
job_duration=job_duration,
commit_within_job=commit_within_job,
failure_retry_seconds=failure_retry_seconds,
)
return ""

Expand All @@ -300,6 +311,7 @@ def _create_single_test_job(
failure_rate=0,
job_duration=0,
commit_within_job=False,
failure_retry_seconds=0,
):
delayed = (
http.request.env["queue.job"]
Expand All @@ -313,6 +325,7 @@ def _create_single_test_job(
failure_rate=failure_rate,
job_duration=job_duration,
commit_within_job=commit_within_job,
failure_retry_seconds=failure_retry_seconds,
)
)
return f"job uuid: {delayed.db_record().uuid}"
Expand All @@ -329,6 +342,7 @@ def _create_graph_test_jobs(
failure_rate=0,
job_duration=0,
commit_within_job=False,
failure_retry_seconds=0,
):
model = http.request.env["queue.job"]
current_count = 0
Expand All @@ -355,6 +369,7 @@ def _create_graph_test_jobs(
failure_rate=failure_rate,
job_duration=job_duration,
commit_within_job=commit_within_job,
failure_retry_seconds=failure_retry_seconds,
)
)

Expand Down
39 changes: 29 additions & 10 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sys
import uuid
import weakref
from contextlib import contextmanager, nullcontext
from datetime import datetime, timedelta
from random import randint

Expand Down Expand Up @@ -402,14 +403,9 @@ def __init__(
raise TypeError("Job accepts only methods of Models")

recordset = func.__self__
env = recordset.env
self.method_name = func.__name__
self.recordset = recordset

self.env = env
self.job_model = self.env["queue.job"]
self.job_model_name = "queue.job"

self.job_config = (
self.env["queue.job.function"].sudo().job_config(self.job_function_name)
)
Expand Down Expand Up @@ -459,10 +455,10 @@ def __init__(
self.exc_message = None
self.exc_info = None

if "company_id" in env.context:
company_id = env.context["company_id"]
if "company_id" in self.env.context:
company_id = self.env.context["company_id"]
else:
company_id = env.company.id
company_id = self.env.company.id
self.company_id = company_id
self._eta = None
self.eta = eta
Expand All @@ -487,7 +483,12 @@ def perform(self):
"""
self.retry += 1
try:
self.result = self.func(*tuple(self.args), **self.kwargs)
if self.job_config.allow_commit:
env_context_manager = self.in_temporary_env()
else:
env_context_manager = nullcontext()
with env_context_manager:
self.result = self.func(*tuple(self.args), **self.kwargs)
except RetryableJobError as err:
if err.ignore_retry:
self.retry -= 1
Expand All @@ -507,6 +508,16 @@ def perform(self):

return self.result

@contextmanager
def in_temporary_env(self):
with self.env.registry.cursor() as new_cr:
env = self.env
self._env = env(cr=new_cr)
try:
yield
finally:
self._env = env

def _get_common_dependent_jobs_query(self):
return """
UPDATE queue_job
Expand Down Expand Up @@ -665,6 +676,14 @@ def __hash__(self):
def db_record(self):
return self.db_records_from_uuids(self.env, [self.uuid])

@property
def env(self):
return self.recordset.env

@env.setter
def _env(self, env):
self.recordset = self.recordset.with_env(env)

@property
def func(self):
recordset = self.recordset.with_context(job_uuid=self.uuid)
Expand Down Expand Up @@ -729,7 +748,7 @@ def model_name(self):

@property
def user_id(self):
return self.recordset.env.uid
return self.env.uid

@property
def eta(self):
Expand Down
19 changes: 16 additions & 3 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from odoo.addons.base_sparse_field.models.fields import Serialized

from ..delay import Graph
from ..exception import JobError
from ..exception import JobError, RetryableJobError
from ..fields import JobSerialized
from ..job import (
CANCELLED,
Expand Down Expand Up @@ -453,10 +453,23 @@ def related_action_open_record(self):
)
return action

def _test_job(self, failure_rate=0, job_duration=0, commit_within_job=False):
def _test_job(
self,
failure_rate=0,
job_duration=0,
commit_within_job=False,
failure_retry_seconds=0,
):
_logger.info("Running test job.")
if random.random() <= failure_rate:
raise JobError("Job failed")
if failure_retry_seconds:
raise RetryableJobError(
f"Retryable job failed, will be retried in "
f"{failure_retry_seconds} seconds",
seconds=failure_retry_seconds,
)
else:
raise JobError("Job failed")
if job_duration:
time.sleep(job_duration)
if commit_within_job:
Expand Down
11 changes: 10 additions & 1 deletion queue_job/models/queue_job_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class QueueJobFunction(models.Model):
"related_action_enable "
"related_action_func_name "
"related_action_kwargs "
"job_function_id ",
"job_function_id "
"allow_commit",
)

def _default_channel(self):
Expand Down Expand Up @@ -79,6 +80,12 @@ def _default_channel(self):
"enable, func_name, kwargs.\n"
"See the module description for details.",
)
allow_commit = fields.Boolean(
help="Allows the job to commit transactions during execution. "
"Under the hood, this executes the job in a new database cursor, "
"which incurs an overhead as it requires an extra connection to "
"the database. "
)

@api.depends("model_id.model", "method")
def _compute_name(self):
Expand Down Expand Up @@ -149,6 +156,7 @@ def job_default_config(self):
related_action_func_name=None,
related_action_kwargs={},
job_function_id=None,
allow_commit=False,
)

def _parse_retry_pattern(self):
Expand Down Expand Up @@ -184,6 +192,7 @@ def job_config(self, name):
related_action_func_name=config.related_action.get("func_name"),
related_action_kwargs=config.related_action.get("kwargs", {}),
job_function_id=config.id,
allow_commit=config.allow_commit,
)

def _retry_pattern_format_error_message(self):
Expand Down
2 changes: 1 addition & 1 deletion queue_job/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def _add_job(self, *args, **kwargs):

def _prepare_context(self, job):
# pylint: disable=context-overridden
job_model = job.job_model.with_context({})
job_model = job.env["queue.job"].with_context({})
field_records = job_model._fields["records"]
# Filter the context to simulate store/load of the job
job.recordset = field_records.convert_to_write(job.recordset, job_model)
Expand Down
2 changes: 2 additions & 0 deletions queue_job/tests/test_model_job_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def test_function_job_config(self):
' "func_name": "related_action_foo",'
' "kwargs": {"b": 1}}'
),
"allow_commit": True,
}
)
self.assertEqual(
Expand All @@ -53,5 +54,6 @@ def test_function_job_config(self):
related_action_func_name="related_action_foo",
related_action_kwargs={"b": 1},
job_function_id=job_function.id,
allow_commit=True,
),
)
6 changes: 6 additions & 0 deletions queue_job/tests/test_run_rob_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,9 @@ def test_get_failure_values(self):
self.assertEqual(
rslt, {"exc_info": "info", "exc_name": "Exception", "exc_message": "zero"}
)

def test_runjob_success(self):
job = self.env["queue.job"].with_delay()._test_job()
RunJobController._runjob(self.env, job)
self.assertEqual(job.state, "done")
self.assertEqual(job.db_record().state, "done")
2 changes: 2 additions & 0 deletions queue_job/views/queue_job_function_views.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<field name="model_id" required="1" />
<field name="method" required="1" />
<field name="channel_id" />
<field name="allow_commit" />
<field name="edit_retry_pattern" widget="ace" />
<field name="edit_related_action" widget="ace" />
</group>
Expand All @@ -24,6 +25,7 @@
<list>
<field name="name" />
<field name="channel_id" />
<field name="allow_commit" />
</list>
</field>
</record>
Expand Down