Kafka and Spark: get first theme offset via API
I am playing with Spark Streaming and Kafka (with Scala API) and would like to read a post from a set of Kafka themes with Spark Streaming.
The next method:
val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "smallest")
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
reads from Kafka to the last available offset, but doesn't give me the metadata that I need (since I am reading from a set of topics, I need for every post I read in this thread), but this other method KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple2[String, String]](ssc, kafkaParams, currentOffsets, messageHandler)
clearly wants the offset I have not.
I know there is this shell command that gives you the last offset.
kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list <broker>: <port>
--topic <topic-name> --time -1 --offsets 1
and KafkaCluster.scala
- this is an API that is for developers who were previously public and gives you exactly what I would like.
Hint?
source to share
You can use code from GetOffsetShell.scala kafka API documentation
val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
val topicAndPartition = TopicAndPartition(topic, partitionId)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
Or you can create a new user with a unique groupId and use that to get the first offset
val consumer=new KafkaConsumer[String, String](createConsumerConfig(config.brokerList))
consumer.partitionsFor(config.topic).foreach(pi => {
val topicPartition = new TopicPartition(pi.topic(), pi.partition())
consumer.assign(List(topicPartition))
consumer.seekToBeginning()
val firstOffset = consumer.position(topicPartition)
...
source to share