Disable fluent / watched buffer

I am having problems getting the always last value from the combination obtained with the combLatest operator.

I have 2 hot streams (a, b) generating events at high frequency (event every 100ms each):

Flowable<OrderBook> flowA = sourceA.getObservableOrderBook(value);
Flowable<OrderBook> flowB = sourceB.getObservableOrderBook(value);

      

combined with combLatest, which takes almost 300ms to get the job done.

Flowable<OrderBookCouple> combined = Flowable.combineLatest(flowA, flowB,        OrderBookCouple::new).observeOn(Schedulers.newThread());
combined.subscribe((bookCouple) -> {
                System.out.println("A timestamp: " + bookCouple.aOrderBook.getTimeStamp());
                System.out.println("B timestamp: " + bookCouple.bOrderBook.getTimeStamp());
                Thread.sleep(300);
            }

      

After one run of the combiner, I would like to handle the most recent combination of the event thrown, which means (lastA, lastB).

The default behavior of the combined stream is to cache the entire combination of events in its own buffer, so that the combined thread gets combinations that are very old, and this time gap explodes.

How do I modify my code to disable this buffer and always get the most recent combinator?

+3


source to share


1 answer


You can apply onBackpressureLatest

for flowA

and flowB

and use an overloadcombineLatest

that lets you specify the amount of prefetch.

Flowable.combineLatest(
    Arrays.asList(flowA.onBackpressureLatest(), flowB.onBackpressureLatest()),
    a -> new OrderBookCouple((OrderBook)a[0], (OrderBook)a[1]),
    1
)
.onBackpressureLatest()
.observeOn(Schedulers.newThread(), false, 1)

      

Unfortunately, there is no overload that accepts both BiFunction and bufferSize, so you need to go back to casting the array elements.



Edit

Applying the second onBackpressureLatest

and limiting the buffer size observeOn

shouldn't get you any closer to the pattern you want, although it combineLatest

doesn't target this use case. You probably want multiple statements with multiple patterns.

+4


source







All Articles