How to unit test Kafka Streams
While researching how to unit test a Kafka flow I came across ProcessorTopologyTestDriver
, unfortunately this class seems to have broken with version 0.10.1.0
( KAFKA-4408 )
Is there a problem with KTable?
I saw the "Mocked Streams" project, but first it uses the version 0.10.2.0
and I'm on 0.10.1.1
, and the second is Scala and my tests are Java / Groovy.
Any help here on how to unit test the flow without having to download zookeeper / kafka would be great.
Note. I have integration tests that use embedded servers, this is for unit tests as well as quick simple tests.
source to share
I found a way to get around this, I'm not sure if this is the answer, especially after the comment https://stackoverflow.com/users/4953079/matthias-j-sax . Anyway, share what I have so far ...
I copied completely ProcessorTopologyTestDriver
from the 0.10.1 branch (this version I am using).
To address KAFKA-4408 , I made private final MockConsumer<byte[], byte[]> restoreStateConsumer
available and moved the snippet task = new StreamTask(...
into a separate method, eg. bootstrap
...
During the setup phase of my test, I do the following
driver = new ProcessorTopologyTestDriver(config, builder)
ArrayList partitionInfos = new ArrayList();
partitionInfos.add(new PartitionInfo('my_ktable', 1, (Node) null, (Node[]) null, (Node[]) null));
driver.restoreStateConsumer.updatePartitions('my_ktable', partitionInfos);
driver.restoreStateConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition('my_ktable', 1), Long.valueOf(0L)));
driver.bootstrap()
What is it...
Bonus
I also ran into KAFKA-4461 , luckily since I copied the whole class I was able to "cherry pick" it accepted the fix with a few minor changes.
Feedback is always appreciated. While apparently not an official test class, this driver has proven to be effective!
source to share