Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate serializer change to AsynPool._create_write_handlers.send_job #9091

Open
11 of 19 tasks
GregoirePelegrin opened this issue Jun 24, 2024 · 1 comment · May be fixed by #9100
Open
11 of 19 tasks

Propagate serializer change to AsynPool._create_write_handlers.send_job #9091

GregoirePelegrin opened this issue Jun 24, 2024 · 1 comment · May be fixed by #9100

Comments

@GregoirePelegrin
Copy link

Checklist

  • I have verified that the issue exists against the main branch of Celery.
  • This has already been asked to the discussions forum first.
  • I have read the relevant section in the
    contribution guide
    on reporting bugs.
  • I have checked the issues list
    for similar or identical bug reports.
  • I have checked the pull requests list
    for existing proposed fixes.
  • I have checked the commit log
    to find out if the bug was already fixed in the main branch.
  • I have included all related issues and possible duplicate issues
    in this issue (If there are none, check this box anyway).
  • I have tried to reproduce the issue with pytest-celery and added the reproduction script below.

Mandatory Debugging Information

  • I have included the output of celery -A proj report in the issue.
    (if you are not able to do this, then at least specify the Celery
    version affected).
  • I have verified that the issue exists against the main branch of Celery.
  • I have included the contents of pip freeze in the issue.
  • I have included all the versions of all the external dependencies required
    to reproduce this bug.

Optional Debugging Information

  • I have tried reproducing the issue on more than one Python version
    and/or implementation.
  • I have tried reproducing the issue on more than one message broker and/or
    result backend.
  • I have tried reproducing the issue on more than one version of the message
    broker and/or result backend.
  • I have tried reproducing the issue on more than one operating system.
  • I have tried reproducing the issue on more than one workers pool.
  • I have tried reproducing the issue with autoscaling, retries,
    ETA/Countdown & rate limits disabled.
  • I have tried reproducing the issue after downgrading
    and/or upgrading Celery and its dependencies.

Related Issues and Possible Duplicates

Related Issues

Possible Duplicates

Environment & Settings

Celery version: 5.4.0

celery report Output:

software -> celery:5.4.0 (opalescent) kombu:5.3.7 py:3.11.9
            billiard:4.2.0 py-amqp:5.2.0
platform -> system:Linux arch:64bit, ELF
            kernel version:5.10.0-30-amd64 imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:amqp results:rpc:///

broker_url: 'amqp://guest:********@localhost:5672//'
result_backend: 'rpc:///'
result_expires: 300
deprecated_settings: None
task_default_queue: 'worker'
event_serializer: 'dill'
task_serializer: 'dill'
result_serializer: 'dill'
accept_content: ['dill',
 'application/x-my-content',
 'json',
 'application/json',
 'application/x-python-serialize']
broker_connection_retry_on_startup: True

Steps to Reproduce

Required Dependencies

  • Minimal Python Version: tested in 3.11.9
  • Minimal Celery Version: tested in 5.4.0
  • Minimal Kombu Version: tested in 5.3.7
  • Minimal Broker Version: N/A or Unknown
  • Minimal Result Backend Version: N/A or Unknown
  • Minimal OS and/or Kernel Version: N/A or Unknown
  • Minimal Broker Client Version: N/A or Unknown
  • Minimal Result Backend Client Version: N/A or Unknown

Python Packages

pip freeze Output:

amqp==5.2.0
billiard==4.2.0
celery==5.4.0
click==8.1.7
click-didyoumean==0.3.1
click-plugins==1.1.1
click-repl==0.3.0
dill==0.3.8
kombu==5.3.7
prompt_toolkit==3.0.47
python-dateutil==2.9.0.post0
six==1.16.0
tzdata==2024.1
vine==5.1.0
wcwidth==0.2.13

Other Dependencies

N/A

Minimally Reproducible Test Case

main.py

from celery.canvas import Signature

from worker import worker


def main():
    task_signature: Signature = worker.signature("method_task")
    task = task_signature.delay(function=lambda x: f"{x}{x}")

if __name__ == "__main__":
    main()

worker.py

from typing import Callable

from celery import Celery
import dill
from kombu.serialization import register


def attach_test_task(celery_worker: Celery) -> Celery:
    @celery_worker.task(name="method_task")
    def method_task(function: Callable = lambda x: f"{x}"):
        print(function("Hello"))


def register_dill():
    def encode(obj):
        return dill.dumps(obj=obj)

    def decode(s):
        deserialized_object = dill.loads(str=s)
        return deserialized_object

    register(
        name="dill",
        encoder=encode,
        decoder=decode,
        content_type="application/x-my-content",
        content_encoding="binary"
    )


def build_worker() -> Celery:
    celery_worker: Celery = Celery(
        "generated_class_task_parameter",
        broker="amqp://guest:guest@localhost:5672",
        backend="rpc://",
        result_expires=300
    )

    register_dill()

    celery_worker.conf.task_default_queue = "worker"
    celery_worker.conf.event_serializer = "dill"
    celery_worker.conf.task_serializer = "dill"
    celery_worker.conf.result_serializer = "dill"
    celery_worker.conf.accept_content = [
        "dill", "application/x-my-content",
        "json", "application/json", "application/x-python-serialize",
    ]
    celery_worker.conf.broker_connection_retry_on_startup = True

    celery_worker = attach_test_task(celery_worker=celery_worker)

    return celery_worker


worker: Celery = build_worker()

Expected Behavior

While the default serializer pickle.dumps is not able to serialize objects in <locals> (such as lambdas or dynamically generated classes' instances), the dill.dumps serializer is able to do so.
I then expected the worker to print "HelloHello".

Actual Behavior

The worker raises the same serialization error as the client when the dill.dumps serializer was not used.

Back trace

[2024-06-24 08:49:01,424: INFO/MainProcess] Task method_task[d214496f-ee29-485d-b8fe-600a5957a494] received
[2024-06-24 08:49:01,425: CRITICAL/MainProcess] Unrecoverable error: AttributeError("Can't pickle local object 'main.<locals>.<lambda>'")
Traceback (most recent call last):
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/celery/worker/worker.py", line 202, in start
    self.blueprint.start(self)
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/celery/bootsteps.py", line 365, in start
    return self.obj.start()
           ^^^^^^^^^^^^^^^^
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 340, in start
    blueprint.start(self)
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 746, in start
    c.loop(*c.loop_args())
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/celery/worker/loops.py", line 97, in asynloop
    next(loop)
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/kombu/asynchronous/hub.py", line 373, in create_loop
    cb(*cbargs)
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/kombu/transport/base.py", line 248, in on_readable
    reader(loop)
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/kombu/transport/base.py", line 230, in _read
    drain_events(timeout=0)
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/amqp/connection.py", line 526, in drain_events
    while not self.blocking_read(timeout):
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/amqp/connection.py", line 532, in blocking_read
    return self.on_inbound_frame(frame)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/amqp/method_framing.py", line 77, in on_frame
    callback(channel, msg.frame_method, msg.frame_args, msg)
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/amqp/connection.py", line 538, in on_inbound_method
    return self.channels[channel_id].dispatch_method(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/amqp/abstract_channel.py", line 156, in dispatch_method
    listener(*args)
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/amqp/channel.py", line 1629, in _on_basic_deliver
    fun(msg)
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/kombu/messaging.py", line 656, in _receive_callback
    return on_m(message) if on_m else self.receive(decoded, message)
           ^^^^^^^^^^^^^
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 685, in on_task_received
    strategy(
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/celery/worker/strategy.py", line 207, in task_message_handler
    handle(req)
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/celery/worker/worker.py", line 220, in _process_task_sem
    return self._quick_acquire(self._process_task, req)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/kombu/asynchronous/semaphore.py", line 75, in acquire
    callback(*partial_args, **partial_kwargs)
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/celery/worker/worker.py", line 225, in _process_task
    req.execute_using_pool(self.pool)
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/celery/worker/request.py", line 754, in execute_using_pool
    result = apply_async(
             ^^^^^^^^^^^^
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/celery/concurrency/base.py", line 153, in apply_async
    return self.on_apply(target, args, kwargs,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/billiard/pool.py", line 1527, in apply_async
    self._quick_put((TASK, (result._job, None, func, args, kwds)))
  File "/home/gregoirepelegrinadmin/anaconda3/envs/local_serializer/lib/python3.11/site-packages/celery/concurrency/asynpool.py", line 868, in send_job
    body = dumps(tup, protocol=protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't pickle local object 'main.<locals>.<lambda>'

This seems to indicate that the arguments are somewhere serialized with the default pickle.dumps serializer after their reception by the worker.
The stack trace points to the AsynPool._create_write_handlers.send_job method, which uses the dumps parameter of the AsynPool._create_write_handlers method. The default value of this parameter is pickle.dumps and is not overwritten in the AsynPool.register_with_event_loop method where _create_write_handlers is called. This confirms the theory of the serialization attempt with the default pickle.dumps serializer.
I have also confirmed that the workers correctly receive the task parameter with the dill.dumps serializer, by using breakpoints to successfully use the deserialized objects.

Additional notes

To have more information on the reasoning behind this attempt and about the successive steps I went through to find this, please see the following Stack Overflow post.
I also have tried discussing this problem in the Github Celery Discussions page

@GregoirePelegrin
Copy link
Author

GregoirePelegrin commented Jun 26, 2024

After replacing dumps=_pickle.dumps by dumps=dill.dumps in

pack=pack, dumps=_pickle.dumps,
as well as replacing loads=pickle_loads by loads=dill.loads in

def _recv(timeout, loads=pickle_loads):

(billiard/billiard/pool.py#444) , the issue is no more!
How could one go about it? Should the default serializer values be dill.dumps and dill.loads, or should there be a global configuration variable to choose the serializer there? I haven't been able to find anything broken by this change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant