Rx java operators; Encapsulating data flow in custom statements

Let's say that I observe the observable in a very specific way.

    resultObservable = anotherObservable.filter(~Filter code~).take(15);  

      

I would like to create a custom operator that combines two predefined operators like filter and accept. Such that he behaves like

    resultObservable = anotherObservable.lift(new FilterAndTake(15));  

      

or...

    resultObservable = anotherObservable.FilterAndTake(15);  

      

So far, I've found it convenient to write a very specific operator that can do this. And I can get this operator up.

But given my limited knowledge of rx java, this would require rewriting the take and filter function every time I need to use it in a custom statement.

It's okay to do this, but I'd rather use the pre-existing operators that are maintained by the open source community, as well as the disposal operators I created.

Something also tells me that I don't have enough knowledge about operators and subscribers.

Can someone recommend tutorials that are not rx-java documentation?
I say this because while the Docs explains general concepts, it isolates concepts and general contexts of their functionality, leaving no examples to inspire more robust RX java applications.

So essentailly

I am trying to encapsulate custom data streams into representative statements. Does this functionality exist?

+3


source to share


2 answers


I don't know of any special function (or sugar) that composes Operator

objects. But you can simply create a new operator to compose existing operators. Here's a working example of the operator FilterAndTake

:

public class FilterAndTake<T> implements Observable.Operator<T, T> {

    private OperatorFilter<T> filter;
    private OperatorTake<T> take;

    public FilterAndTake(Func1<? super T, Boolean> predicate, int n) {
        this.filter = new OperatorFilter<T>(predicate);
        this.take = new OperatorTake<T>(n);
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> child) {
        return filter.call(take.call(child));
    }
}

      



And then you can use it like this:

public static void main(String[] args) {
    Observable<Integer> xs = Observable.range(1, 8);

    Func1<Integer, Boolean> predicate = new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer x) {
            return x % 2 == 0;
        }
    };

    Action1<Integer> action = new Action1<Integer>() {
        @Override
        public void call(Integer x) {
            System.out.println("> " + x);
        }
    };

    xs.lift(new FilterAndTake<Integer>(predicate, 2)).subscribe(action);
}

      

+5


source


A bit late for the party, but why compose

exists:



Observable
.from(....)
.flatMap(... -> ....)
.compose(filterAndTake(15))
.subscribe(...)

public <T> Transformer<T,T> flterAndTake(int num) {
  return source -> source
     .filter(~Filter code~)
     .take(num);
}

      

+1


source







All Articles