How to achieve post-sequence buffering logic in RxJava2?

I want to achieve the following with RxJava, and since I may not have enough knowledge in this area you would like some help :)

I need to create a PublishSubject that will generate events with the following sequence:

  • Emit 1, 2, 3
  • Buffer 4 completes the subscription if a certain condition is not met (could be a network connection, for example, or some other condition).
  • For 5, 6 ... buffers after 4 if condition is not already met.
  • Repeat to emit 4 after a while when the condition is met.
  • When trying to emit 5,6 and the condition is met, instead of buffering 5, 6 ... after 4, we just emit 4 and then 5, 6, 7, 8 ...

The last 2 points are necessary because the emission sequence is really important, which makes it difficult for me to achieve this.

I hope I can describe what I want to achieve :)

Conclusions: After I asked this question, I made some conclusions and achieved the following results:

private Observable observable = publishSubject
        .observeOn(Schedulers.io())
        .map(Manager::callNew)
        .doOnError(throwable -> Logger.e(throwable, "Error occurred"))
        .retryWhen(throwableObservable -> throwableObservable
                .zipWith(Observable.range(1, 10), (n, i) -> i)
                .flatMap(retryCount -> {
                    long retrySeconds = (long) Math.pow(2, retryCount);
                    Logger.d("The call has been failed retrying in %s seconds. Retry count %s", retrySeconds, retryCount);
                    return Observable.timer(retrySeconds, TimeUnit.SECONDS)
                            .doOnNext(aLong -> {
                                C24Logger.d("Timer was completed. %s", aLong);
                            })
                            .doOnComplete(() -> Logger.d("Timer was completed."));
                }));

      

The problem is here at PublishSubject. Since it has already released all the elements, it only emits new ones for retryWhen. If I use ReplaySubject, it also emits old pre-made items for a new retryWhen is re-signed, which I no longer need.

Is there a way to use ReplaySubject to remove completed items from the buffer?

+3


source to share


1 answer


You want to turn buffering on and off based on an external condition. Perhaps the easiest way to do this is to use an operator buffer()

to continuously buffer items based on a condition.

(I removed stuff from the observer chain)

private Observable observable = publishSubject
      .publish( obs -> obs.buffer( obs.filter( v -> externalCondition( v ) ) ) )
      .flatMapIterable( bufferedList -> bufferedList )
      .subscribe( ... );

      



An operator publish()

allows multiple observer chains to subscribe to an incoming observer chain. The operator buffer()

monitors an observable value that emits a value only if the outer condition is true.

When the outer condition is true, it buffer()

will return a series of lists with only one item. When the condition is false, it buffer()

starts buffering the results, and when the condition returns again, all buffered items are emitted as a list. The step flatMapIterable()

will take each element out of the buffer and emit it separately.

0


source







All Articles