Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ERROR: Received and deleted unknown message. Wrong destination?!? #8812

Open
selintunr opened this issue Jan 20, 2024 · 6 comments
Open

ERROR: Received and deleted unknown message. Wrong destination?!? #8812

selintunr opened this issue Jan 20, 2024 · 6 comments

Comments

@selintunr
Copy link

Environment & Settings

Celery version:

celery report Output:

software -> celery:5.3.6 (emerald-rush) kombu:5.3.5 py:3.9.18
            billiard:4.2.0 py-amqp:5.2.0
platform -> system:Darwin arch:64bit
            kernel version:22.6.0 imp:CPython
loader   -> celery.loaders.default.Loader
settings -> transport:amqp results:disabled

deprecated_settings: None

Python Packages

pip freeze Output:

aiohttp==3.9.1
aiosignal==1.3.1
amqp==5.2.0
anyio==4.1.0
async-timeout==4.0.3
attrs==23.1.0
audioread==3.0.1
billiard==4.2.0
boto3==1.34.23
botocore==1.34.23
celery==5.3.6
certifi==2023.11.17
cffi==1.16.0
charset-normalizer==3.3.2
click==8.1.7
click-didyoumean==0.3.0
click-plugins==1.1.1
click-repl==0.3.0
decorator==5.1.1
distro==1.8.0
et-xmlfile==1.1.0
exceptiongroup==1.2.0
fastapi==0.95.2
frozenlist==1.4.0
future==0.18.3
gevent==23.9.1
greenlet==3.0.3
h11==0.14.0
httpcore==1.0.2
httpx==0.25.2
idna==3.6
jmespath==1.0.1
joblib==1.3.2
kombu==5.3.5
llvmlite==0.39.1
mangum==0.17.0
multidict==6.0.4
numba==0.56.4
numpy==1.23.5
openpyxl==3.1.2
packaging==23.2
pandas==2.1.1
platformdirs==4.1.0
pooch==1.8.0
prompt-toolkit==3.0.43
pycparser==2.21
pycurl==7.45.2
pydantic==1.10.13
pyloudnorm==0.1.1
python-dateutil==2.8.2
python-dotenv==1.0.0
python-multipart==0.0.6
pytz==2023.3.post1
PyYAML==6.0.1
redis==5.0.1
requests==2.31.0
resampy==0.4.2
s3transfer==0.10.0
scikit-learn==1.3.2
scipy==1.11.4
six==1.16.0
sniffio==1.3.0
SQLAlchemy==2.0.17
starlette==0.27.0
threadpoolctl==3.2.0
tqdm==4.66.1
typing_extensions==4.9.0
tzdata==2023.3
urllib3==1.26.18
uvicorn==0.22.0
vine==5.1.0
wcwidth==0.2.12
yarl==1.9.4
zope.event==5.0
zope.interface==6.1

Behavior

How I run my celery worker: celery -A sqs_worker worker -l info -P gevent

The error i get after sending a json message to the queue:

[2024-01-20 12:27:01,277: WARNING/MainProcess] Received and deleted unknown message.  Wrong destination?!?

The full contents of the message body was: body: '{"task": "sampleTaskName", "id": "123e4567-e89b-12d3-a456-426655440000", "args": ["arg1", "arg2", "arg3"], "kwargs": {"kwarg1": "value1", "kwarg2": "value2"}, "retries": 2}' (172b)
{content_type:None content_encoding:None
  delivery_info:{'sqs_message': {'MessageId': '17eb5f73-959e-4036-9928-6859beb108ce', 'ReceiptHandle': 'AQEBTVogfmZOVkeZVsiFLEMByL1VJEwXoeNnwNzRs6qlbxweSWwFkKZzfARnxteAloBD9DiPAlSvRn3zfNKScTgcboFh31lmsRPMBlvr70Q1v1py3QHxLWmziu1jyQXoHt0UGJ/BItqGD7WdUYkcw91EozDQKDS6EmPxE9pKlsk49dACP8PvSqrhlbOxZRQwOx9dVHWacEsIqz5Vjenl9UqC5jvCzVb3kqwrWh57aJrMw8jc4c 7TREejeZ43tmdCZQTwDjnZAdWmcHrc5JWAbeP7PONwMSu/4WBAyH5Lu/d0ggy/F8QFklLjxsa/li5pSgkdGSHH2ZcIxmBVL1Ha6TrlMRZBkcAMfUMVP6k0czVOrggZUVdEkGfBIOp0daSRY7b', 'MD5OfBody': '44163d3eb01ef5b433a086eaf2be12a2', 'Body': '{"task": "sampleTaskName", "id": "123e4567-e89b-12d3-a456-426655440000", "args": ["arg1", "arg2", "arg3"], "kwargs": {"kwarg1": "value1", "kwarg2": "value2"}, "retries": 2}'}, 'sqs_queue': 'https://sqs.us-east-1.amazonaws.com/855429961467/test-queue'} headers={}}

I tried pretty much everything I found on internet but nothing worked. Also tried to configure celery to accept various types of content:

app = Celery('sqs_worker', broker=f'sqs://{AWS_ACCESS_KEY_ID}:{AWS_SECRET_ACCESS_KEY}@')
app.conf.task_queues = [Queue('test-queue', Exchange('default'), routing_key='test-queue')]
app.conf.task_default_queue = 'test-queue'
app.conf.accept_content = ['application/json', 'json', 'pickle', 'yaml', 'msgpack']
@50-Course
Copy link
Contributor

Hi @selintunr,

Can you help me understand you better with the problem you facing? btw, can you please provide output from the following command:

pip freeze --local

@Fagner-lourenco
Copy link

WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?The full contents of the message body was:

Solution?

@gioknx
Copy link

gioknx commented Apr 5, 2024

I have the same problem. My message is a simple json, and I can't find anywhere any references to how "Celery-specific" messages should be.

@Nisk33
Copy link

Nisk33 commented Jul 24, 2024

Is anyone able to resolve this, i have the same problem when i consume from sqs my celery worker just doesn't recognise the json format ?

@gsudhanshu
Copy link

@selintunr @gioknx I am facing same issue. did you guys find out root cause or solution

@AlexanderZharyuk
Copy link

@gsudhanshu @Nisk33 Hey guys!

I encountered a similar problem and found a solution that helped me.

In my case, I have two services – Service A and Service B. Service A simply publishes messages to a queue, while Service B consumes these messages and processes some business logic. Service A publishes messages like this:

def publish(self, queue: str, message: dict | str):
    logger.info(f'Publishing message to queue: {queue}')
    self.channel.queue_declare(queue=queue, durable=True)
    if isinstance(message, dict):
        message = json.dumps(message)

    self.channel.basic_publish(
        exchange=self.exchange,
        routing_key=queue,
        body=message,
        properties=pika.BasicProperties(delivery_mode=2)
    )
    logger.info('Message successfully published.')

Later, my Celery task received this message and produced the following error in the log: [2024-09-10 18:59:07,152: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?

I checked the source code and found that Celery expects a specific message format when accepting messages without using syntax like my_task.apply_async(...). To consume messages from another service, worker, etc., you need to include id and task fields in the message headers to implement this flow and avoid the "wrong destination" error.

Here's a brief example:

tasks/consumer.py

from core.celery_app import celery_app

@celery_app.task(bind=True, retry=5, default_retry_delay=60)
def consume_message(self, event_body):
    <put-your-logic-here>

core/celery_app.py

import django
from celery import Celery
from celery.fixups.django import DjangoFixup
from django.apps import apps
from django.conf import settings
from kombu.common import Broadcast, Queue

from core.settings import project_settings


celery_app = Celery('core')
celery_app.config_from_object('django.conf:settings')
DjangoFixup(celery_app).install()

if not settings.configured:
    settings.configure(**project_settings.data)
    django.setup()

celery_app.autodiscover_tasks(lambda: [n.name for n in apps.get_app_configs()])
celery_app.conf.task_queues = (Queue('name-of-your-queue'))
celery_app.conf.task_routes = {
    'tasks.consume_message': {
        'queue': 'name-of-your-queue',
    },
}
celery_app.conf.result_extended = True

tasks/producers.py

class RabbitMQProducer:

    def __init__(
        self,
        host: str = settings.BROKER_HOST,
        port: int = settings.BROKER_PORT,
        heartbeat: int = settings.BROKER_HEARTBEAT,
        blocked_connection_timeout: int = settings.BLOCKED_CONNECTION_TIMEOUT,
        password: str = settings.BROKER_PASSWORD,
        username: str = settings.BROKER_USERNAME,
        vhost: str = settings.BROKER_VHOST,
        exchange: str = settings.BROKER_EXCHANGE
    ):
        self.exchange = exchange
        self.credentials = pika.PlainCredentials(username, password)
        self.connection_params = pika.ConnectionParameters(
            host=host,
            port=port,
            heartbeat=heartbeat,
            blocked_connection_timeout=blocked_connection_timeout,
            credentials=self.credentials,
            virtual_host=vhost
        )
        self.connection = pika.BlockingConnection(self.connection_params)
        self.channel = self.connection.channel()

    def publish(self, queue: str, message: dict | str):
        logger.info(f'Publishing message to queue: {queue}')
        self.channel.queue_declare(queue=queue, durable=True)
        if isinstance(message, dict):
            message = json.dumps(message)

        message_with_celery_body = {
            "args": [message],
            "kwargs": {}
        }
        self.channel.basic_publish(
            exchange=self.exchange,
            routing_key=queue,
            body=json.dumps(message_with_celery_body),
            properties=pika.BasicProperties(
                delivery_mode=2,
                headers={'id': str(uuid.uuid4()), 'task': 'tasks.consume_message'},
                content_type='application/json'
            )
        )
        logger.info('Message successfully published.')

As you can see, I added headers and content type to the message properties to include the necessary fields for successful message acceptance by the Celery worker in another service. I hope this helps!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants