Reading Avro messages from Kafka using Structured Streaming in Spark 2.1

I followed @Ralph Gonzalez's post on this thread by reading Avro posts from Kafka using Structured Streaming in Spark 2.1, but I am getting the following error.

org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40
    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
    at com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:99)
    at com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:98)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

      

I came across @Michael G. Noll's post here which suggests using DataFileReader instead of binaryDecoder as shown below.

DatumReader<GenericRecord> datumReader = new SpecificDatumReader<GenericRecord>(schema); 
DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(inputStream, datumReader);

      

I tried using this in scala but was not successful. Below is the current state of the code.

def main (args: Array [String]) {

val KafkaBroker = "**.**.**.**:9092";
val InTopic = "avro";

// Get Spark session
val session = SparkSession
  .builder
  .master("local[*]")
  .appName("myapp")
  .getOrCreate()

// Load streaming data
import session.implicits._

//val msg=data.selectExpr("CAST(value AS Array[Byte])")
//val rec = reader.read(null, decoder.binaryDecoder(msg, null))
//val disp=msg.writeStream.outputMode("append").format("console").start()


val data = session
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KafkaBroker)
  .option("subscribe", InTopic)
  .load()
  .select($"value".as[Array[Byte]])
  .map(d => {
    val rec = reader.read(null, decoder.binaryDecoder(d, null))
    val payload = rec.get("payload").asInstanceOf[Byte].toString
    new KafkaMessage(payload)
  })

val query = data.writeStream
  .outputMode("Append")
  .format("console")
  .start()

query.awaitTermination()

      

}

My schema and case class looks like below

case class KafkaMessage(
                       payload: String )
                       val schemaString =    """{
                        "type" : "record",
                        "name" : "HdfsEvent",
                        "namespace" : "com.expedia.txb.domain.hdfs",
                        "fields" : [ {
                          "name" : "payload",
                          "type" : {
                            "type" : "bytes",
                            "java-class" : "[B"
                          }
                        } ]
                      }"""

      

I've literally spent the last 2 days on this and so any help would be much appreciated. Thank.

+3


source to share





All Articles