How to correlate the Observable with the function from the Future to the Future?

Suppose I have a stream of events from elements with type In

:

val observableIn: Observable[In] = ???

      

And a function of converting type In

objects to type objects Out

, but "in the future":

val futureInToFutureOut: (Future[In]) => Future[Out] = ???

      

At this point, I want to transform my elements observableIn

to match my function futureInToFutureOut

. That is, I want as a result a stream of events of elements of the type Out

that correspond to the elements of the original stream, but converted through a function futureInToFutureOut

.

I think this should work:

val observableOut: Observable[Out] = observableIn flatMap { in =>
  Observable.from(futureInToFutureOut(Future(in)))
}

      

Is it correct? Is there a better way to do this?

+3


source to share


1 answer


EDIT:

Your solution is correct as far as I can tell. If you want to improve performance a little, consider:

val observableOut: Observable[Out] = observableIn.flatMap { in =>
  val p = Promise.successful(in)
  Observable.from(futureInToFutureOut(p.future))
}

      

This is a bit faster as it doesn't create an asynchronous computation to display the future like Future.apply

it does
.

OLD:

I leave my previous suggestion below, which only works if you map one event to Observable

.

import scala.concurrent._
val promiseIn = Promise[Int]()
observableIn.foreach(x => promiseIn.trySuccess(x))
val observableOut = Observable.create { observer =>
  promiseIn.future.map(futureInToFutureOut).foreach { y =>
    observer.onNext(y)
    observer.onCompleted()
  }
}

      

Explanation



Since you are starting with an object Observable[In]

(i.e. a stream of events), you need to find a way to carry the event from that Observable

to the future. The typical way to create a new future is to create its Promise

first - the input side of the future object . Then you use foreach

in Observable

to be called trySuccess

in the future when the first event arrives:

observableIn ---x---> foreach ---x---> promiseIn.trySuccess(x)

      

As soon as the event Observable

arrives, the promise will be executed asynchronously. Now we can get the read side of the promise, that is, the future, by calling its method future

; and then call map

into the future - promiseIn.future.map(futureInToFutureOut)

. Graphic:

promiseIn.future ---x---> map ---f(x)---> futureOut

      

The resulting future is completed asynchronously with futureInToFutureOut(x)

. At this point, we need to find a way to correct this value via Observable[Out]

. The typical way to create a new one Observable

is to call the Observable.create

factory method . This method gives as the end of the entry Observable

- Observer

, which we use to generate events by calling onNext

:

futureOut ---f(x)---> foreach ---f(x)---> observer.onNext(f(x))

      

Since we know that the future emits at most one event, we call onCompleted

on the observer to close the exit Observable

.

Edit: If you want to master Rx and Scala futures, you can consider this book , which covers these topics, Disclaimer: I am the author.

+2


source







All Articles