How to stop a thread of a streaming job from a program

I'm trying to create a JUnit test for a Flink threading job that writes data to a kafka topic and reads data from the same kafka topic using FlinkKafkaProducer09

and FlinkKafkaConsumer09

respectively. I am passing test data to a file:

DataStream<String> stream = env.fromElements("tom", "jerry", "bill");

      

And checking if the same data is coming from the consumer like:

List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result =  resultSink.getResult();
assertEquals(expected, result);

      

with help TestListResultSink

.

I can see the data coming from the consumer as expected by printing the stream. But could not get a Junit test result as the consumer will continue to work even after the message is complete. So it didn't come to check the part.

Is there a way to Flink

either FlinkKafkaConsumer09

stop the process or start a specific time?

+3


source to share


3 answers


The main problem is that streaming programs are usually not finite and run indefinitely.

The best way, at least for now, is to insert a special control message into your stream that allows the source to shutdown properly (just stop reading more data, leaving the read loop). This way, Flink will inform all downstream operators that they can stop after they have destroyed all the data.



Alternatively, you can throw a custom exception in your source (after some time, for example) so that you can distinguish between "correct" completion and failure (by checking for the cause of the error). Throwing an exception at source will result in a program error.

+3


source


Following @TillRohrman

You can combine a custom exception method and handle it in a unit test if you are using an EmbeddedKafka instance and then read the EmbeddedKafka topic and assert the consumer values.



I found https://github.com/asmaier/mini-kafka/blob/master/src/test/java/de/am/KafkaProducerIT.java to be extremely helpful in this regard.

The only problem is that you will lose the element that is triggering the exception, but you can always adjust your test data to account for this.

0


source


Can't you use the isEndOfStream override in Deserializer to stop fetching from Kafka? If I read correctly, flink / Kafka09Fetcher has the following code in its start method that breaks the event loop

    if (deserializer.isEndOfStream(value)) {
                        // end of stream signaled
                        running = false;
                        break;
                    }

      

My thought was to use Till Rormann's idea of ​​a control message in conjunction with this isEndOfStream method to tell KafkaConsumer to stop reading.

Any reason that won't work? Or maybe some corner cases that I am missing?

https://github.com/apache/flink/blob/07de86559d64f375d4a2df46d320fc0f5791b562/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internalFcher. java # L146

0


source







All Articles