Is there a consistent Future.find?

I have a side effect function,

def f(): Future[Int] = {
  val n = Random.nextInt()
  println(s"Generated $n")
  Future(n)
}

      

and I want to re-execute it until the predicate returns true.

def success(n: Int): Boolean = n % 2 == 0

      

My plan is to create Stream

results

val s = Stream.fill(10)(f)

      

and then use Future.find

to get the first result that satisfies the predicate.

Future.find(s)(success) map println

      

The problem is that it Future.find

executes all the futures in parallel and I want it to execute the futures sequentially one after the other until the predicate returns true.

scala> Future.find(s)(success) map println
Generated -237492703
Generated -935476293
Generated -1155819556
Generated -375506595
Generated -912504491
Generated -1307379057
Generated -1522265611
Generated 1163971151
Generated -516152076
res8: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@37d28f02
Some(-1155819556)

      

The question is how to sequentially execute the futures flow until the predicate returns true? Are there any suitable functions in the standard or third party library?

+3


source to share


3 answers


Instead of using Stream, I suggest using a different approach. Using the "Future" filter and recovering from recursively:

def findFirst[A](futureGen: => Future[A], predicate: A => Boolean): Future[A] = {
  futureGen.filter(predicate).recoverWith { case _ => findFirst(futureGen, predicate) }
}

findFirst(f, success)

      



This will call Futures one by one until "success" returns true.

+4


source


First, let us not be interested in futures:

val s1 = s.map(_.filter(success))

      



Now you can combine two such futures and get the first successful value with fallbackTo

. And just flush the stream starting from a known bad future:

def firstSuccess[T](stream: Stream[Future[T]]): Future[T] = 
  if (stream.isEmpty)
    Future.failed(new NoSuchElementException)
  else
    stream.head.fallbackTo(firstSuccess(stream.tail))

      

+1


source


If I understand the question, you will have to block the thread to continue sequentially. You can use Await for this.

scala> def f(): Future[Int] = {
 |   val n = Random.nextInt()
 |   println(s"Generated $n")
 |   Future(n)
 | }
f: ()scala.concurrent.Future[Int]

scala> def success(n: Int): Boolean = n % 2 == 0
success: (n: Int)Boolean

scala> val s = Stream.fill(10)(f)

      

Using your path I get

scala> Future.find(s)(success) map println
Generated 551866055
Generated -561348666
Generated -1103407834
Generated -812310371
Generated -1544170923
Generated 2131361419
Generated -236722325
Generated -1473890302
Generated -82395856
Some(-561348666)
res16: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@15a2d71

      

I should get an answer like Some (-561348666) which you can get like

scala> s.find(x => success(Await.result(x,1 seconds))).get onSuccess {case p=> println(p)}
-561348666

      

0


source







All Articles