Disable fluent / watched buffer
I am having problems getting the always last value from the combination obtained with the combLatest operator.
I have 2 hot streams (a, b) generating events at high frequency (event every 100ms each):
Flowable<OrderBook> flowA = sourceA.getObservableOrderBook(value);
Flowable<OrderBook> flowB = sourceB.getObservableOrderBook(value);
combined with combLatest, which takes almost 300ms to get the job done.
Flowable<OrderBookCouple> combined = Flowable.combineLatest(flowA, flowB, OrderBookCouple::new).observeOn(Schedulers.newThread());
combined.subscribe((bookCouple) -> {
System.out.println("A timestamp: " + bookCouple.aOrderBook.getTimeStamp());
System.out.println("B timestamp: " + bookCouple.bOrderBook.getTimeStamp());
Thread.sleep(300);
}
After one run of the combiner, I would like to handle the most recent combination of the event thrown, which means (lastA, lastB).
The default behavior of the combined stream is to cache the entire combination of events in its own buffer, so that the combined thread gets combinations that are very old, and this time gap explodes.
How do I modify my code to disable this buffer and always get the most recent combinator?
source to share
You can apply onBackpressureLatest
for flowA
and flowB
and use an overloadcombineLatest
that lets you specify the amount of prefetch.
Flowable.combineLatest(
Arrays.asList(flowA.onBackpressureLatest(), flowB.onBackpressureLatest()),
a -> new OrderBookCouple((OrderBook)a[0], (OrderBook)a[1]),
1
)
.onBackpressureLatest()
.observeOn(Schedulers.newThread(), false, 1)
Unfortunately, there is no overload that accepts both BiFunction and bufferSize, so you need to go back to casting the array elements.
Edit
Applying the second onBackpressureLatest
and limiting the buffer size observeOn
shouldn't get you any closer to the pattern you want, although it combineLatest
doesn't target this use case. You probably want multiple statements with multiple patterns.
source to share