Synchronized feedback with Akka threads

What I'm trying to achieve is to implement something like a synchronized feedback loop with akka threads.

Let's say you have Flow[Int].filter(_ % 5 == 0)

. When you cast a stream Int

to that stream and zip the tuples right behind it, you get something like

(0,0)
(5,1)
(10,2)

      

Is there a way to emit Option[Int]

that indicates whether the stream emitted an element after I pushed the next one through it or not?

(Some(0),0)
(None, 1)
(None, 2)
(None, 3)
(None, 4)
(Some(5), 5)
(None, 6)
...

      

I thought about implementing my own DetachedStage

right in front of me and behind Flow

to hold the state, whenever the thread entered the scene before I knew it needed the next item. When there was no element on the stage, it was Nothing.

Unfortunately, the results are not good and are not removed by many positions.

side notes

The Flow filter is just an example, it can be a very long flow where I cannot provide the ability to emit Option

at every step in it, so I really need to know if the flow was pushed next or not requested the next one from the downstream

I also played around with conflate

and expand

, but this is even worse with positional offsets of the results

One thing that I changed in the configuration is the buffer initial

and max

for the stream, so that I can be sure that the specified requirement is valid after the element that I passed through it.

It would be nice to get some suggestions on how to fix this problem!

+3


source to share


1 answer


I can't pinpoint exactly what you're looking for. But I can recoup the future of what you are looking for, for example:

(Future(Some(0)), 0)
(Future(None)   , 1)
(Future(None)   , 2)
...

      

Expanding on your example, if a stream is given that cannot be changed:

val flow = Flow[Int].filter(_ % 5 == 0)

      

This stream can then be evaluated on a singular input, and the result is converted to Option

:

import scala.concurrent.{Future, Promise}
import akka.stream.{Materializer, ActorMaterializer}
import akka.stream.scaladsl.{Source,Sink}

def evalFlow(in : Int, flow : Flow[Int, Int, _])(implicit mat : Materializer, ec : ExecutionContext) = {
  val fut : Future[Int] = 
    Source.single(in)
          .via(flow)
          .runWith(Sink.head) //Throws an Exception if filter fails

  fut.map(Some(_))              //       val => Some(val)
     .fallbackTo(Promise(None)) // Exception => None
} 

      

This function returns Future[Option[Int]]

. We can then use an estimate to simply combine the result with the input:



def evalAndCombine(flow : Flow[Int, Int, _])(in : Int)(implicit mat : Materializer, ec : ExecutionContext) =
  (evalFlow(in, flow), in)//(Future[Option[Int]], Int)

      

And finally, the function evalAndCombine

can be placed after your Ints source:

import akka.actor.ActorSystem

implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
import actorSystem.dispatcher

val exampleSource = Source(() => (1 to 6).toIterator)

val tupleSource = exampleSource map evalAndCombine(flow)

      

Likewise if you want Future[(Option[Int], Int)]

instead of (Future[Option[Int]], Int)

eg .:

Future[(Some(0), 0)]
Future[(None   , 1)]
...

      

Then slightly change the function of the combine:

def evalAndCombine(flow : Flow[Int, Int, _])(in : Int)(implicit mat : Materializer, ec : ExecutionContext) =
  evalFlow(in, flow) map (option => (option, in))//Future[(Option[Int], Int)]

      

+2


source







All Articles