Akka flow - limiting flow without introducing a delay

I am working with Akka (version 2.4.17

) to create an observation flow in Java (for example from type elements <T>

to stay generic).

My requirement is that this stream should be tuned for the maximum number of observations per unit of time , once they arrive . For example, it should be able to deliver no more than 2 observations per minute (the first ones that come, the rest can be deleted).

I have looked very closely at the Akka documentation and in particular this page which details inline steps and their semantics.

So far I have tried the following approaches.

  • With modes throttle

    and shaping()

    (so as not to close the stream when the limit is exceeded):

      Flow.of(T.class)
           .throttle(2, 
                     new FiniteDuration(1, TimeUnit.MINUTES), 
                     0, 
                     ThrottleMode.shaping())
    
          

  • With groupedWith

    and an intermediate custom method:

    final int nbObsMax = 2;
    
    Flow.of(T.class)
        .groupedWithin(Integer.MAX_VALUE, new FiniteDuration(1, TimeUnit.MINUTES))
        .map(list -> {
             List<T> listToTransfer = new ArrayList<>();
             for (int i = list.size()-nbObsMax ; i>0 && i<list.size() ; i++) {
                 listToTransfer.add(new T(list.get(i)));
             }
             return listToTransfer;
        })
        .mapConcat(elem -> elem)  // Splitting List<T> in a Flow of T objects
    
          

The previous approaches give me the correct number of observations per unit of time, but those observations are kept and only delivered at the end of the time window (and hence there is additional latency).

To give a more specific example, if the following observations arrive in my thread:

[Obs1 t = 0s] [Obs2 t = 45s] [Obs3 t = 47s] [Obs4 t = 121s] [Obs5 t = 122s]

It should only output the following as soon as they arrive (processing time can be neglected here):

Window 1: [Obs1 t ~ 0s] [Obs2 t ~ 45s] Window 2: [Obs4 t ~ 121s] [Obs5 t ~ 122s]

Any help would be appreciated, thanks for reading my first StackOverflow post;)

+3


source to share


3 answers


I can't think of an out-of-the-box solution that does what you want. The choke will radiate in constant flux due to the way it is implemented with the bucket model, rather than having a lease allowed at the start of each time period.

To get the exact behavior you want, you'll need to create your own rate limiting step (which might not be that hard). You can find docs on how to create custom stages here: http://doc.akka.io/docs/akka/2.5.0/java/stream/stream-customize.html#custom-linear-processing-stages-using -graphstage



One project that might work is to have a permission counter indicating how many items you can select, reset each interval, for each incoming item, you subtract one from the counter and emit when the allowance is used up, you keep pulling but discarding items rather than emitting them. Using TimerGraphStageLogic

for GraphStageLogic

allows you to set temporary feedback that can reset to provide a discount.

+3


source


I think this is exactly what you need: http://doc.akka.io/docs/akka/2.5.0/java/stream/stream-cookbook.html#Globally_limiting_the_rate_of_a_set_of_streams



+2


source


Thanks to @johanandren's answer, I have successfully implemented a custom GraphStage that suits my requirements.

I am posting the code below in case anyone is interested:

import akka.stream.Attributes;
import akka.stream.FlowShape;
import akka.stream.Inlet;
import akka.stream.Outlet;
import akka.stream.stage.*;
import scala.concurrent.duration.FiniteDuration;

public class CustomThrottleGraphStage<A> extends GraphStage<FlowShape<A, A>> {

    private final FiniteDuration silencePeriod;
    private int nbElemsMax;

    public CustomThrottleGraphStage(int nbElemsMax, FiniteDuration silencePeriod) {
        this.silencePeriod = silencePeriod;
        this.nbElemsMax = nbElemsMax;
    }

    public final Inlet<A> in = Inlet.create("TimedGate.in");
    public final Outlet<A> out = Outlet.create("TimedGate.out");

    private final FlowShape<A, A> shape = FlowShape.of(in, out);
    @Override
    public FlowShape<A, A> shape() {
        return shape;
    }

    @Override
    public GraphStageLogic createLogic(Attributes inheritedAttributes) {
        return new TimerGraphStageLogic(shape) {

            private boolean open = false;
            private int countElements = 0;

            {
                setHandler(in, new AbstractInHandler() {
                    @Override
                    public void onPush() throws Exception {
                        A elem = grab(in);
                        if (open || countElements >= nbElemsMax) {
                            pull(in);  // we drop all incoming observations since the rate limit has been reached
                        }
                        else {
                            if (countElements == 0) { // we schedule the next instant to reset the observation counter
                                scheduleOnce("resetCounter", silencePeriod);
                            }
                            push(out, elem); // we forward the incoming observation
                            countElements += 1; // we increment the counter
                        }
                    }
                });
                setHandler(out, new AbstractOutHandler() {
                    @Override
                    public void onPull() throws Exception {
                        pull(in);
                    }
                });
            }

            @Override
            public void onTimer(Object key) {
                if (key.equals("resetCounter")) {
                    open = false;
                    countElements = 0;
                }
            }
        };
    }
}

      

0


source







All Articles