Create a Publisher and Subscriber Actor with the same Actor

I am new to acck streams. I am using kafka as the source (using the ReactiveKafka library) and doing some processing on the data through the stream and using the subscriber (EsHandler) as the sink.

Now I need to handle errors and push them to another kafka queue via an error handler. I am trying to use both Publisher and Subscriber EsHandler. I'm not sure how to include EsHandler as a middle person instead of sink.

This is my code:

val publisher = Kafka.kafka.consume(topic, "es", new StringDecoder())

val flow = Flow[String].map { elem => JsonConverter.convert(elem.toString()) }

val sink = Sink.actorSubscriber[GenModel](Props(classOf[EsHandler]))

Source(publisher).via(flow).to(sink).run()


class EsHandler extends ActorSubscriber with ActorPublisher[Model] {

  val requestStrategy = WatermarkRequestStrategy(100)

  def receive = {
    case OnNext(msg: Model) =>
      context.actorOf(Props(classOf[EsStorage], self)) ! msg

    case OnError(err: Exception) =>
      context.stop(self)

    case OnComplete =>
      context.stop(self)

    case Response(msg) =>
      if (msg.isError()) onNext(msg.getContent())
  }
}

class ErrorHandler extends ActorSubscriber {

  val requestStrategy = WatermarkRequestStrategy(100)

  def receive = {
    case OnNext(msg: Model) =>
      println(msg)
  }
}

      

+3


source to share


1 answer


We strongly recommend that you do not use your own processor (which is called reactive streams , which "Subscriber && Publisher" gives. It is rather difficult to get it right, so there is no publisher that is directly considered a subsidiary trait.

Instead, most of the time, you'll want to use the Sources

/ Sinks

(or Publishers

/ Subscribers

) provided to you and do your actions in between like map / filter steps, etc.



In fact, there is an existing implementation for Kafka Sources and Sinks that you can use, it is called reactive-kafka and is validated by the TCK Reactive Stream , so you can trust it as valid implementations.

+3


source







All Articles