You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm using celery with an SQS backend. It has acks_late turned on, as well as a list of autoretry_for exceptions with a max retry of 5. On our BaseTask class, there's a on_failure callback, where some failure post-processing happens. Our main SQS queue has a DLQ, but due to celery (by default) acknowledging every message, including the failed messages, we realize that, after those 5 retries, the message disappears. This is unfortunate, as we know that the errors we retry are transient, and while retry attempts may expire, we'd like to keep the message around in order to redrive when the system is stable again.
We thought about using acks_on_failure_or_timeout. However, and because we only want this flag to be disabled when it's a retriable and the attempts were exhausted, we thought about using the on_failure callback:
classBaseTask(Task):
acks_on_failure_or_timeout=Truedefon_failure(self, exception, celery_task_id, task_args, *args, **kwargs):
ifexception.__class__inself.autoretry_for:
self.acks_on_failure_or_timeout=False# rest of logic goes here
This doesn't work for us though: we're using the default multi-process worker mode, and after some investigation, although the task instance we deal with has the same object id as this here, they execute in different processes: our callback runs in the forked worker, whereas the code linked above runs in the master process. Which means, the acks_on_failure_or_timeout value is still True.
It seems that this isn't supported at all, so the question is, would it be possible to support it? For example, if I could assign a callback function to acks_on_failure_or_timeout which returns a boolean (instead of only a boolean), this could perhaps work.
Another option would be to run the on_successful callback, and acknowledge the task (or not) in the worker instead of the master process.
I have checked the issues list
for similar or identical feature requests.
I have checked the pull requests list
for existing proposed implementations of this feature.
I have checked the commit log
to find out if the same feature was already implemented in the
main branch.
I have included all related issues and possible duplicate issues
in this issue (If there are none, check this box anyway).
The text was updated successfully, but these errors were encountered:
I'm using celery with an SQS backend. It has
acks_late
turned on, as well as a list ofautoretry_for
exceptions with a max retry of 5. On ourBaseTask
class, there's aon_failure
callback, where some failure post-processing happens. Our main SQS queue has a DLQ, but due to celery (by default) acknowledging every message, including the failed messages, we realize that, after those 5 retries, the message disappears. This is unfortunate, as we know that the errors we retry are transient, and while retry attempts may expire, we'd like to keep the message around in order to redrive when the system is stable again.We thought about using
acks_on_failure_or_timeout
. However, and because we only want this flag to be disabled when it's a retriable and the attempts were exhausted, we thought about using theon_failure
callback:This doesn't work for us though: we're using the default multi-process worker mode, and after some investigation, although the task instance we deal with has the same object id as this here, they execute in different processes: our callback runs in the forked worker, whereas the code linked above runs in the master process. Which means, the
acks_on_failure_or_timeout
value is stillTrue
.It seems that this isn't supported at all, so the question is, would it be possible to support it? For example, if I could assign a callback function to
acks_on_failure_or_timeout
which returns a boolean (instead of only a boolean), this could perhaps work.Another option would be to run the
on_successful
callback, and acknowledge the task (or not) in the worker instead of the master process.for similar or identical feature requests.
for existing proposed implementations of this feature.
to find out if the same feature was already implemented in the
main branch.
in this issue (If there are none, check this box anyway).
The text was updated successfully, but these errors were encountered: