Is there a consistent Future.find?
I have a side effect function,
def f(): Future[Int] = {
val n = Random.nextInt()
println(s"Generated $n")
Future(n)
}
and I want to re-execute it until the predicate returns true.
def success(n: Int): Boolean = n % 2 == 0
My plan is to create Stream
results
val s = Stream.fill(10)(f)
and then use Future.find
to get the first result that satisfies the predicate.
Future.find(s)(success) map println
The problem is that it Future.find
executes all the futures in parallel and I want it to execute the futures sequentially one after the other until the predicate returns true.
scala> Future.find(s)(success) map println
Generated -237492703
Generated -935476293
Generated -1155819556
Generated -375506595
Generated -912504491
Generated -1307379057
Generated -1522265611
Generated 1163971151
Generated -516152076
res8: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@37d28f02
Some(-1155819556)
The question is how to sequentially execute the futures flow until the predicate returns true? Are there any suitable functions in the standard or third party library?
source to share
Instead of using Stream, I suggest using a different approach. Using the "Future" filter and recovering from recursively:
def findFirst[A](futureGen: => Future[A], predicate: A => Boolean): Future[A] = {
futureGen.filter(predicate).recoverWith { case _ => findFirst(futureGen, predicate) }
}
findFirst(f, success)
This will call Futures one by one until "success" returns true.
source to share
First, let us not be interested in futures:
val s1 = s.map(_.filter(success))
Now you can combine two such futures and get the first successful value with fallbackTo
. And just flush the stream starting from a known bad future:
def firstSuccess[T](stream: Stream[Future[T]]): Future[T] =
if (stream.isEmpty)
Future.failed(new NoSuchElementException)
else
stream.head.fallbackTo(firstSuccess(stream.tail))
source to share
If I understand the question, you will have to block the thread to continue sequentially. You can use Await for this.
scala> def f(): Future[Int] = {
| val n = Random.nextInt()
| println(s"Generated $n")
| Future(n)
| }
f: ()scala.concurrent.Future[Int]
scala> def success(n: Int): Boolean = n % 2 == 0
success: (n: Int)Boolean
scala> val s = Stream.fill(10)(f)
Using your path I get
scala> Future.find(s)(success) map println
Generated 551866055
Generated -561348666
Generated -1103407834
Generated -812310371
Generated -1544170923
Generated 2131361419
Generated -236722325
Generated -1473890302
Generated -82395856
Some(-561348666)
res16: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@15a2d71
I should get an answer like Some (-561348666) which you can get like
scala> s.find(x => success(Await.result(x,1 seconds))).get onSuccess {case p=> println(p)}
-561348666
source to share