Kafka and Spark: get first theme offset via API

I am playing with Spark Streaming and Kafka (with Scala API) and would like to read a post from a set of Kafka themes with Spark Streaming.

The next method:

val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "smallest")
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

      

reads from Kafka to the last available offset, but doesn't give me the metadata that I need (since I am reading from a set of topics, I need for every post I read in this thread), but this other method KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple2[String, String]](ssc, kafkaParams, currentOffsets, messageHandler)

clearly wants the offset I have not.

I know there is this shell command that gives you the last offset.

kafka-run-class.sh kafka.tools.GetOffsetShell 
  --broker-list <broker>:  <port> 
  --topic <topic-name> --time -1 --offsets 1 

      

and KafkaCluster.scala

- this is an API that is for developers who were previously public and gives you exactly what I would like.

Hint?

+1


source to share


1 answer


You can use code from GetOffsetShell.scala kafka API documentation

val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
val topicAndPartition = TopicAndPartition(topic, partitionId)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets

      



Or you can create a new user with a unique groupId and use that to get the first offset

val consumer=new KafkaConsumer[String, String](createConsumerConfig(config.brokerList))
consumer.partitionsFor(config.topic).foreach(pi => {
      val topicPartition = new TopicPartition(pi.topic(), pi.partition())

      consumer.assign(List(topicPartition))
      consumer.seekToBeginning()
      val firstOffset = consumer.position(topicPartition)
 ...

      

+1


source







All Articles