How to unit test Kafka Streams

While researching how to unit test a Kafka flow I came across ProcessorTopologyTestDriver

, unfortunately this class seems to have broken with version 0.10.1.0

( KAFKA-4408 )

Is there a problem with KTable?

I saw the "Mocked Streams" project, but first it uses the version 0.10.2.0

and I'm on 0.10.1.1

, and the second is Scala and my tests are Java / Groovy.

Any help here on how to unit test the flow without having to download zookeeper / kafka would be great.

Note. I have integration tests that use embedded servers, this is for unit tests as well as quick simple tests.

+3


source to share


1 answer


I found a way to get around this, I'm not sure if this is the answer, especially after the comment https://stackoverflow.com/users/4953079/matthias-j-sax . Anyway, share what I have so far ...

I copied completely ProcessorTopologyTestDriver

from the 0.10.1 branch (this version I am using).

To address KAFKA-4408 , I made private final MockConsumer<byte[], byte[]> restoreStateConsumer

available and moved the snippet task = new StreamTask(...

into a separate method, eg. bootstrap

...

During the setup phase of my test, I do the following

driver = new ProcessorTopologyTestDriver(config, builder)
ArrayList partitionInfos = new ArrayList();
partitionInfos.add(new PartitionInfo('my_ktable', 1, (Node) null, (Node[]) null, (Node[]) null));
driver.restoreStateConsumer.updatePartitions('my_ktable', partitionInfos);
driver.restoreStateConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition('my_ktable', 1), Long.valueOf(0L)));
driver.bootstrap()

      



What is it...

Bonus

I also ran into KAFKA-4461 , luckily since I copied the whole class I was able to "cherry pick" it accepted the fix with a few minor changes.

Feedback is always appreciated. While apparently not an official test class, this driver has proven to be effective!

+2


source







All Articles