How to correlate the Observable with the function from the Future to the Future?
Suppose I have a stream of events from elements with type In
:
val observableIn: Observable[In] = ???
And a function of converting type In
objects to type objects Out
, but "in the future":
val futureInToFutureOut: (Future[In]) => Future[Out] = ???
At this point, I want to transform my elements observableIn
to match my function futureInToFutureOut
. That is, I want as a result a stream of events of elements of the type Out
that correspond to the elements of the original stream, but converted through a function futureInToFutureOut
.
I think this should work:
val observableOut: Observable[Out] = observableIn flatMap { in =>
Observable.from(futureInToFutureOut(Future(in)))
}
Is it correct? Is there a better way to do this?
source to share
EDIT:
Your solution is correct as far as I can tell. If you want to improve performance a little, consider:
val observableOut: Observable[Out] = observableIn.flatMap { in =>
val p = Promise.successful(in)
Observable.from(futureInToFutureOut(p.future))
}
This is a bit faster as it doesn't create an asynchronous computation to display the future like Future.apply
it does .
OLD:
I leave my previous suggestion below, which only works if you map one event to Observable
.
import scala.concurrent._
val promiseIn = Promise[Int]()
observableIn.foreach(x => promiseIn.trySuccess(x))
val observableOut = Observable.create { observer =>
promiseIn.future.map(futureInToFutureOut).foreach { y =>
observer.onNext(y)
observer.onCompleted()
}
}
Explanation
Since you are starting with an object Observable[In]
(i.e. a stream of events), you need to find a way to carry the event from that Observable
to the future. The typical way to create a new future is to create its Promise
first - the input side of the future object . Then you use foreach
in Observable
to be called trySuccess
in the future when the first event arrives:
observableIn ---x---> foreach ---x---> promiseIn.trySuccess(x)
As soon as the event Observable
arrives, the promise will be executed asynchronously. Now we can get the read side of the promise, that is, the future, by calling its method future
; and then call map
into the future - promiseIn.future.map(futureInToFutureOut)
. Graphic:
promiseIn.future ---x---> map ---f(x)---> futureOut
The resulting future is completed asynchronously with futureInToFutureOut(x)
. At this point, we need to find a way to correct this value via Observable[Out]
. The typical way to create a new one Observable
is to call the Observable.create
factory method . This method gives as the end of the entry Observable
- Observer
, which we use to generate events by calling onNext
:
futureOut ---f(x)---> foreach ---f(x)---> observer.onNext(f(x))
Since we know that the future emits at most one event, we call onCompleted
on the observer to close the exit Observable
.
Edit: If you want to master Rx and Scala futures, you can consider this book , which covers these topics, Disclaimer: I am the author.
source to share