How to display entries from Kafka to console?

I am learning Structured Streaming and I was unable to display the output to the console.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.ProcessingTime

object kafka_stream {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("kafka-consumer")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._

    spark.sparkContext.setLogLevel("WARN")

//    val schema = StructType().add("a", IntegerType()).add("b", StringType())

    val schema = StructType(Seq(
      StructField("a", IntegerType, true),
      StructField("b", StringType, true)
    ))

    val df = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "172.21.0.187:9093")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .load()


    val values = df.selectExpr("CAST(value AS STRING)").as[String]

    values.writeStream
      .outputMode("append")
      .format("console")
      .start()
      .awaitTermination()


  }


}

      

My contribution to kafka

my name is abc how are you ?

      

I just want to display lines from kafka to run console

+3


source to share





All Articles