Akka Stream connects to multiple receivers

I have implemented a custom component in akka stream that takes elements as input, groups and merges them based on a key, and sends them through one of a dozen outlets. You can think of this component as a kind of GroupBy component that does not split the stream into sub-streams, but into actual streams. In addition to splitting the incoming elements, it concatenates them into one element, that is, there is some buffering going on inside the buffer, so that one element inside does not necessarily mean that one element is going out through the output.

Below is a simplified implementation of the mentioned component.

class CustomGroupBy[A,B](k: Int, f: A => Int) extends GraphStage[FlowShape[B, B]] {

  val in = Inlet[A]("CustomGroupBy.in")
  val outs = (0 until k).map(i => Outlet[B](s"CustomGroupBy.$i.out"))

  override val shape = new AmorphousShape(scala.collection.immutable.Seq(in), outs)

  /* ... */
}

      

Now I'm going to plug each outlet of this component into a different sink and combine the materialized value of all of these sinks.

I have tried several things with DSL graph but I was unable to get it to work. Would anyone be so kind as to provide me with a snippet to do this or point me in the right direction?

Thanks in advance!

+3


source to share


2 answers


You most likely want an inline broadcast . A usage example can be found here :



val bcast = builder.add(Broadcast[Int](2))

in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
            bcast ~> f4 ~> merge

      

+4


source


You probably want akka.stream.scaladsl.Partition[T](outputPorts: Int, partitioner: T β‡’ Int)

stage .

EDIT:

In order to connect all ports and store the materialized values, you must specify your stages as parameters to the method GraphDSL.create

.

This allows you to define a combiner for materialized values ​​and add steps to yours GraphDSLBuilder

as parameters for the last argument. Note that this overloaded method create

does not take a varargs parameter, so it might not be possible for 14 different stages to be handled this way.



Assuming some names for your stages, here's how I would do it, in the case of three outputs:

val runnable = RunnableGraph.fromGraph(
  GraphDSL.create(
    source, customGroupBy, sink1, sink2, sink3)(combiner) {  //the combiner is the function to combine the materialized values
      implicit b => //this is the builder, needed as implicit to make the connections 
      (src, cgb, s1, s2, s3) => //here are the stages added to the builder
      import GraphDSL.Implicits._

      src.out ~> cgb.in
      List(s1, s2, s3).map(_.in).zip(cgb.outlets).foreach{
        case (in, out) => in ~> out
      }

      ClosedShape
    }
  )
)

      

Remember, if you don't need one of the materialized stage value, you can simply add it inside the DSL by doing val cgb = b.add(customGroupBy)

+1


source







All Articles