Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable per-task acks_on_failure_or_timeout #9342

Open
4 tasks done
HoneyryderChuck opened this issue Oct 8, 2024 · 0 comments
Open
4 tasks done

Enable per-task acks_on_failure_or_timeout #9342

HoneyryderChuck opened this issue Oct 8, 2024 · 0 comments

Comments

@HoneyryderChuck
Copy link

HoneyryderChuck commented Oct 8, 2024

I'm using celery with an SQS backend. It has acks_late turned on, as well as a list of autoretry_for exceptions with a max retry of 5. On our BaseTask class, there's a on_failure callback, where some failure post-processing happens. Our main SQS queue has a DLQ, but due to celery (by default) acknowledging every message, including the failed messages, we realize that, after those 5 retries, the message disappears. This is unfortunate, as we know that the errors we retry are transient, and while retry attempts may expire, we'd like to keep the message around in order to redrive when the system is stable again.

We thought about using acks_on_failure_or_timeout. However, and because we only want this flag to be disabled when it's a retriable and the attempts were exhausted, we thought about using the on_failure callback:

class BaseTask(Task):
acks_on_failure_or_timeout = True

def on_failure(self, exception, celery_task_id, task_args, *args, **kwargs):
    if exception.__class__ in self.autoretry_for:
        self.acks_on_failure_or_timeout = False

    # rest of logic goes here

This doesn't work for us though: we're using the default multi-process worker mode, and after some investigation, although the task instance we deal with has the same object id as this here, they execute in different processes: our callback runs in the forked worker, whereas the code linked above runs in the master process. Which means, the acks_on_failure_or_timeout value is still True.

It seems that this isn't supported at all, so the question is, would it be possible to support it? For example, if I could assign a callback function to acks_on_failure_or_timeout which returns a boolean (instead of only a boolean), this could perhaps work.

Another option would be to run the on_successful callback, and acknowledge the task (or not) in the worker instead of the master process.

  • I have checked the issues list
    for similar or identical feature requests.
  • I have checked the pull requests list
    for existing proposed implementations of this feature.
  • I have checked the commit log
    to find out if the same feature was already implemented in the
    main branch.
  • I have included all related issues and possible duplicate issues
    in this issue (If there are none, check this box anyway).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant