Spark Python Avro Kafka Deserialiser

I created a kafka stream in a python spark application and can parse any text that goes through it.

            kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})

      

I want to change this to be able to parse avro posts from kafka topic. When parsing avro messages from a file, I do it like this:

            reader = DataFileReader(open("customer.avro", "r"), DatumReader())  

      

I'm new to python and sparks, how do I change the thread to parse Avro's post? Also how can I specify the schema to use when reading Avro post from Kafka ??? I did it all in java, but python confuses me.

Edit:

I tried changing to enable avro decoder

            kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1},valueDecoder=avro.io.DatumReader(schema))

      

but i am getting the following error

            TypeError: 'DatumReader' object is not callable

      

+3


source to share


2 answers


I had the same problem - deserializing Avro messages from Kafka in pyspark and solving it using the Messageerializer module for the Confluent Schema Registry module, since in our case the schema is stored in the Confluent Schema Registry.

You can find this module at https://github.com/verisign/python-confluent-schemaregistry



from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers import MessageSerializer
schema_registry_client = CachedSchemaRegistryClient(url='http://xx.xxx.xxx:8081')
serializer = MessageSerializer(schema_registry_client)


# simple decode to replace Kafka-streaming built-in decode decoding UTF8 ()
def decoder(s):
    decoded_message = serializer.decode_message(s)
    return decoded_message

kvs = KafkaUtils.createDirectStream(ssc, ["mytopic"], {"metadata.broker.list": "xxxxx:9092,yyyyy:9092"}, valueDecoder=decoder)

lines = kvs.map(lambda x: x[1])
lines.pprint()

      

Obviously, as you can see, this code takes a new, direct approach without receivers, hence createdDirectStream (see https://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html for details )

+5


source


As @Zoltan Fedor mentioned in a comment, the provided answer is a bit outdated since 2.5 years have passed since it was written. The confluent-kafka-python library has evolved to support the same functionality nativly. The only chnage required in this code is the following.

from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer

      

And then you can change this line -



kvs = KafkaUtils.createDirectStream(ssc, ["mytopic"], {"metadata.broker.list": "xxxxx:9092,yyyyy:9092"}, valueDecoder=serializer.decode_message)

      

I tested it and it works well. I am adding this answer to anyone who may need it in the future.

0


source







All Articles