Reactive programming with RxScala
I have an Observable that connects to a service via the Socket protocol. Connection to the socket is done through the client library. The client library I am using has a java.util.Observer with which I can register the events that are inserted into it.
final class MyObservable extends Observable[MyEvent] {
def subscribe(subscriber: Subscriber[MyEvent]) = {
// connect to the Socket (Step: 1)
// get the responses that are pushed (Step: 2)
// transform them into MyEvent type (Step: 3)
}
}
I have two open questions that I don't understand.
How can I get the result of Step: 3 in my subscriber?
Every time I get a MyEvent, with a subscriber like below, I see a new connection is being created. Eventually, steps 1, 2, and 3 are triggered for each incoming event.
val myObservable = new MyObservale()
myObservable.subscribe()
+3
source to share
1 answer
If I don't understand your question, you just call onNext
:
def subscribe(subscriber: Subscriber[MyEvent]) = {
// connect to the Socket (Step: 1)
// get the responses that are pushed (Step: 2)
// transform them into MyEvent type (Step: 3)
// finally notify the subscriber:
subscriber.onNext(myEventFromStep3)
}
and the code that gets signed will do something like:
myObservable.subscribe(onNext = println(_))
+2
source to share