Spark Structured Streaming does not restart on Kafka offsets

We have a long Spark Structured Streaming request that is being read from Kafka, and we would like this request to take where it left off after restarting. However, we set it startingOffsets

to " earliest

" and what we see after restarting is that the request is read again from the beginning of the Kafka topic.

Our main request looks like this:

  val extract = sparkSession
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "server:port")
    .option("subscribe", "topic")
    .option("startingOffsets", "earliest")
    .load()

  val query: StreamingQuery = extract 
    .writeStream
    .option("checkpointLocation", s"/tmp/checkpoint/kafka/")
    .foreach(writer)
    .start()

      

We can see that the checkpoint directory is being created correctly and with the offsets that we expect in the offset files.

On restart, we see a message like:

25-07-2017 14:35:32 INFO  ConsumerCoordinator:231 - Setting newly assigned partitions [KafkaTopic-2, KafkaTopic-1, KafkaTopic-0, KafkaTopic-3] for group spark-kafka-source-dedc01fb-c0a7-40ea-8358-a5081b961968--1396947302-driver

      

We say the request starts with " earliest

", but the documentation says:

This only applies when starting a new streaming request, and this resume will always occur from where the request stopped.

Does this mean that restarting our application causes the request to resume where it left off?

Installing " group.id

" for Kafka is not allowed with Spark Structured Streaming. See So: Note that the following Kafka parameters cannot be set and the Kafka source is throwing an exception.

I tried adding queryName

, in case it was used to identify the request in different scenarios, but it had no effect.

We are using Spark 2.1 at YARN.

Any ideas on why this isn't working or what we're doing wrong?

UPDATE FROM LOGS:

From driver

From Employee

+3


source to share





All Articles