How to add custom operator to DataStream API

I want to implement an operator that has two input streams and takes an element from each stream to be processed at the same time, eg. join. Also, if one of both inputs has no data, the operator blocks and waits for it.

If I have to do this, which classes are involved? The tutorial on this is much better. Any suggestion would be appreciated!

+3


source to share


1 answer


You need to connect the two DataStream

and apply TwoInputStreamOperator

. There are already many predefined operators. In your case, a CoFlatMapFunction

good choice would be:

DataStream input1 = ...
DataStream input2 = ...

input1.connect(input2).flatMap(new MyOwnCoFlatMapFunction());

      

More details here: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#co-operators



However, this operator cannot block as you want it to. Thus, you need to apply the following pattern: every time you get input from the left or right, you need to buffer the input if the input from the other side is not available:

MyOwnCoFlatMapFunction implements CoFlatMapFunction {
    List<IN> leftInput = new LinkedList<IN>();
    List<IN> rightInput = new LinkedList<IN>();

    void flatMap1(IN1 value, Collector<OUT> out) throws Exception {
        if(rightInput.size() > 0) {
          IN right = rightInput.remove();
            // process left input (value) and right input (right) together
        } else {
             leftInput.add(value);
        }
    }

    // reverse pattern for flatMap2 here
}

      

However, you need to be aware that blocking is dangerous when processing a stream. If your input stream has different baud rates, this approach will not work (!) As the slower stream decreases the speed of operation, resulting in back pressure for a faster one. I don't know your use case, but it seems to be something "wrong". Why can't you join in time?

+3


source







All Articles