Spark Streaming + Kafka: how to check topic title from kafka post

I am using Spark Streaming to read from a list of Kafka topics. I am following the official API in this link . Method used:

val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "largest")
val topics = Set(configuration.getKafkaInputTopic())
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
            ssc, kafkaParams, topics)

      

I'm wondering how a performer will read a post from a topic list? What will be their policy? Will they read the topic and then when they have finished sending messages on other topics?

Most importantly, how can I, after calling this method, check what is the subject of the message in the RDD?

stream.foreachRDD(rdd => rdd.map(t => {
        val key = t._1
        val json = t._2
        val topic = ???
})

      

+3


source to share


1 answer


I'm wondering how a performer will read a post from a topic list? What will be their policy? Will they read a topic and then move on to other topics when they finish posts?

In live streaming mode, the driver is responsible for reading offsets in the Kafka themes that you want to use. What it does is create a mapping between topics, sections and offsets to be read. The driver then assigns each worker a range to read into a specific Kafka topic. This means that if one worker can run two tasks at the same time (just for example, he can usually run many more), then he can read two separate Kafka topics at the same time.

how can I, after calling this method, check what is the subject of the message in the RDD?



You can use an overload createDirectStream

that takes MessageHandler[K, V]

:

val topicsToPartitions: Map[TopicAndPartition, Long] = ???

val stream: DStream[(String, String)] = 
  KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
        ssc, 
        kafkaParams, 
        topicsToPartitions,
        mam: MessageAndMetadata[String, String]) => (mam.topic(), mam.message())

      

0


source







All Articles