-
-
Notifications
You must be signed in to change notification settings - Fork 4.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ERROR: Received and deleted unknown message. Wrong destination?!? #8812
Comments
Hi @selintunr, Can you help me understand you better with the problem you facing? btw, can you please provide output from the following command: pip freeze --local |
WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?The full contents of the message body was: Solution? |
I have the same problem. My message is a simple json, and I can't find anywhere any references to how "Celery-specific" messages should be. |
Is anyone able to resolve this, i have the same problem when i consume from sqs my celery worker just doesn't recognise the json format ? |
@selintunr @gioknx I am facing same issue. did you guys find out root cause or solution |
@gsudhanshu @Nisk33 Hey guys! I encountered a similar problem and found a solution that helped me. In my case, I have two services – Service A and Service B. Service A simply publishes messages to a queue, while Service B consumes these messages and processes some business logic. Service A publishes messages like this: def publish(self, queue: str, message: dict | str):
logger.info(f'Publishing message to queue: {queue}')
self.channel.queue_declare(queue=queue, durable=True)
if isinstance(message, dict):
message = json.dumps(message)
self.channel.basic_publish(
exchange=self.exchange,
routing_key=queue,
body=message,
properties=pika.BasicProperties(delivery_mode=2)
)
logger.info('Message successfully published.') Later, my Celery task received this message and produced the following error in the log: I checked the source code and found that Celery expects a specific message format when accepting messages without using syntax like Here's a brief example: tasks/consumer.py from core.celery_app import celery_app
@celery_app.task(bind=True, retry=5, default_retry_delay=60)
def consume_message(self, event_body):
<put-your-logic-here> core/celery_app.py import django
from celery import Celery
from celery.fixups.django import DjangoFixup
from django.apps import apps
from django.conf import settings
from kombu.common import Broadcast, Queue
from core.settings import project_settings
celery_app = Celery('core')
celery_app.config_from_object('django.conf:settings')
DjangoFixup(celery_app).install()
if not settings.configured:
settings.configure(**project_settings.data)
django.setup()
celery_app.autodiscover_tasks(lambda: [n.name for n in apps.get_app_configs()])
celery_app.conf.task_queues = (Queue('name-of-your-queue'))
celery_app.conf.task_routes = {
'tasks.consume_message': {
'queue': 'name-of-your-queue',
},
}
celery_app.conf.result_extended = True tasks/producers.py class RabbitMQProducer:
def __init__(
self,
host: str = settings.BROKER_HOST,
port: int = settings.BROKER_PORT,
heartbeat: int = settings.BROKER_HEARTBEAT,
blocked_connection_timeout: int = settings.BLOCKED_CONNECTION_TIMEOUT,
password: str = settings.BROKER_PASSWORD,
username: str = settings.BROKER_USERNAME,
vhost: str = settings.BROKER_VHOST,
exchange: str = settings.BROKER_EXCHANGE
):
self.exchange = exchange
self.credentials = pika.PlainCredentials(username, password)
self.connection_params = pika.ConnectionParameters(
host=host,
port=port,
heartbeat=heartbeat,
blocked_connection_timeout=blocked_connection_timeout,
credentials=self.credentials,
virtual_host=vhost
)
self.connection = pika.BlockingConnection(self.connection_params)
self.channel = self.connection.channel()
def publish(self, queue: str, message: dict | str):
logger.info(f'Publishing message to queue: {queue}')
self.channel.queue_declare(queue=queue, durable=True)
if isinstance(message, dict):
message = json.dumps(message)
message_with_celery_body = {
"args": [message],
"kwargs": {}
}
self.channel.basic_publish(
exchange=self.exchange,
routing_key=queue,
body=json.dumps(message_with_celery_body),
properties=pika.BasicProperties(
delivery_mode=2,
headers={'id': str(uuid.uuid4()), 'task': 'tasks.consume_message'},
content_type='application/json'
)
)
logger.info('Message successfully published.') As you can see, I added headers and content type to the message properties to include the necessary fields for successful message acceptance by the Celery worker in another service. I hope this helps! |
Environment & Settings
Celery version:
celery report
Output:Python Packages
pip freeze
Output:Behavior
How I run my celery worker:
celery -A sqs_worker worker -l info -P gevent
The error i get after sending a json message to the queue:
I tried pretty much everything I found on internet but nothing worked. Also tried to configure celery to accept various types of content:
The text was updated successfully, but these errors were encountered: