Reactive programming with RxScala

I have an Observable that connects to a service via the Socket protocol. Connection to the socket is done through the client library. The client library I am using has a java.util.Observer with which I can register the events that are inserted into it.

final class MyObservable extends Observable[MyEvent] {

  def subscribe(subscriber: Subscriber[MyEvent]) = {
    // connect to the Socket (Step: 1)
    // get the responses that are pushed (Step: 2)
    // transform them into MyEvent type (Step: 3)
  }
}

      

I have two open questions that I don't understand.

How can I get the result of Step: 3 in my subscriber?

Every time I get a MyEvent, with a subscriber like below, I see a new connection is being created. Eventually, steps 1, 2, and 3 are triggered for each incoming event.

val myObservable = new MyObservale()
myObservable.subscribe()

      

+3


source to share


1 answer


If I don't understand your question, you just call onNext

:

def subscribe(subscriber: Subscriber[MyEvent]) = {
  // connect to the Socket (Step: 1)
  // get the responses that are pushed (Step: 2)
  // transform them into MyEvent type (Step: 3)

  // finally notify the subscriber:
  subscriber.onNext(myEventFromStep3)
}

      



and the code that gets signed will do something like:

myObservable.subscribe(onNext = println(_))

      

+2


source







All Articles