Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kcat fails to deserialize Avro using schema registry, but confluent_kafka_python succeeds #356

Open
spenczar opened this issue Dec 9, 2021 · 5 comments

Comments

@spenczar
Copy link

spenczar commented Dec 9, 2021

I am trying to use kcat to read Confluent Wire Format-encoded messages. They're Avro encoded, and I have a Schema Registry. This little Python script works with confluent_kafka_python v1.7.0:

from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
import time
import os


def main():
    sr_client = SchemaRegistryClient({"url": os.environ["KAFKA_SCHEMA_REGISTRY"]})

    deserializer = AvroDeserializer(sr_client)

    config = {
        "bootstrap.servers": os.environ["KAFKA_BOOTSTRAP"],
        "group.id": os.environ["KAFKA_USERNAME"]   "-testing",
        "security.protocol": "SASL_SSL",
        "sasl.mechanisms": "SCRAM-SHA-512",
        "sasl.username": os.environ["KAFKA_USERNAME"],
        "sasl.password": os.environ["KAFKA_PASSWORD"],
        "value.deserializer": deserializer,
    }
    consumer = DeserializingConsumer(config)

    consumer.subscribe(["alerts-simulated"])

    while True:
        msg = consumer.poll(1)
        if msg is None:
            print("waiting for messages...")
            time.sleep(1)
        else:
            deserialized = msg.value()
            print(deserialized)


if __name__ == "__main__":
    main()

But this kcat invocation fails, even though it should be identical, and even while running on the same messages:

-> % docker run -it --network=host edenhill/kcat:1.7.0 \
  -b $KAFKA_BOOTSTRAP \
  -X security.protocol=SASL_SSL \
  -X sasl.mechanisms=SCRAM-SHA-512 \
  -X sasl.username=$KAFKA_USERNAME \
  -X sasl.password=$KAFKA_PASSWORD \
  -C \
  -t alerts-simulated \
  -r $KAFKA_SCHEMA_REGISTRY \
  -s value=avro \
  -J
% Reached end of topic alerts-simulated [2] at offset 1650023
% Reached end of topic alerts-simulated [0] at offset 1612739
% ERROR: Failed to deserialize value in message in alerts-simulated [1] at offset 1652255: Failed to encode Avro as JSON: terminating

None of this data is sensitive, so I can upload the schema and message here too: message_and_schema.zip

That zip has a 'msg.avro' which is the Kafka message (including the 5-byte Confluent Wire Format header) and a "schema.json" which is the Avro schema used.

Be warned - this is a big and complex schema, about 40KB, and the Avro message is 580KiB of scientific simulation data.

@crccheck
Copy link

crccheck commented Apr 1, 2022

I have a very similar issue where I have a JavaScript client that can successfully tail a topic while kcat errors with:

Avro/Schema-registry key deserialization: Invalid CP1 magic byte 57, expected 0: message not produced with Schema-Registry Avro framing: terminating

I know there's a magic byte because I'm pulling it off the raw message and I know it is encoded in Avro.

@edenhill
Copy link
Owner

edenhill commented Apr 1, 2022

Each message must be prefixed by the schema-registry specific framing, which is one byte for magic (value 0) and then the schemaId in big-endian (4 bytes), then followed by the serialized payload (e.g., serialized avro).
This message means there was no such framing.
You can verify what the framing looks like by running kcat without a deserializer and passing the key to hexdump or similar.

@crccheck
Copy link

crccheck commented Apr 5, 2022

oooh, my issue was staring me in the face and I didn't notice... "key deserialization". Only my values are serialized. Changing my config to -s value=avro fixed it. This wasn't intuitive because other clients I've used and written search for the magic byte and only decode if it exists.

I also had to pass my schema registry password unencoded where I could have sworn I read that it should be url encoded.

@spenczar
Copy link
Author

@edenhill You can confirm that msg.avro in the zip I provided uses the schema-registry specific framing ("Confluent wire format"):

-> % cat msg.avro | head -c 5 | hexdump
0000000 0000 0000 0001
0000005

This is saying that I have a message with schema ID of 1. I still see the above error from kcat.

@edenhill
Copy link
Owner

Failed to encode Avro as JSON: suggests that deserialization worked, but it failed to convert the avro objects to JSON.
It could be that your schema uses constructs that are not supported by the avro-c version that is used by kcat.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants