Spark Streaming with multiple Kafka streams

I am creating a kafka stream with the following codes:

val streams = (1 to 5) map {i => 
    KafkaUtils.createStream[....](
              streamingContext,
              Map( .... ),
              Map(topic -> numOfPartitions),
              StorageLevel.MEMORY_AND_DISK_SER
              ).filter(...)
              .mapPartitions(...)
              .reduceByKey(....)
val unifiedStream = streamingContext.union(streams)
unifiedStream.foreachRDD(...)
streamingContext.start()

      

I give each thread a group id. When I run the application, only a subset of the kafka messages are received and the executor waits for a foreachRDD call. If I only create one thread everything works well. There are no exceptions to the registration information.

I don't know why the app is stuck there. Does this mean that there are not enough resources?

+3


source to share


1 answer


You want to try setting the parameter



SparkConf().set("spark.streaming.concurrentJobs", "5")

      

+1


source







All Articles