How to create a unit test using kafka built in spring cloud flow
Sorry that the question is too general, but someone has a tutorial or guide on how to do producer and consumer testing with the built in kafka. I've tried several, but there are several versions of dependencies and none of them work = /
I am using spring kafka cloud stream.
+3
Tiago Costa
source
to share
1 answer
We usually recommend using Test Binder in your tests, but if you want to use the built-in kafka server this can be done ...
Add this to your POM ...
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
Test Application ...
@SpringBootApplication
@EnableBinding(Processor.class)
public class So43330544Application {
public static void main(String[] args) {
SpringApplication.run(So43330544Application.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public byte[] handle(byte[] in){
return new String(in).toUpperCase().getBytes();
}
}
application.properties ...
spring.cloud.stream.bindings.output.destination=so0544out
spring.cloud.stream.bindings.input.destination=so0544in
spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.input.group=so0544
Test version ...
@RunWith(SpringRunner.class)
@SpringBootTest
public class So43330544ApplicationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);
@Autowired
private KafkaTemplate<byte[], byte[]> template;
@Autowired
private KafkaProperties properties;
@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}
@Test
public void testSendReceive() {
template.send("so0544in", "foo".getBytes());
Map<String, Object> configs = properties.buildConsumerProperties();
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test0544");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(configs);
Consumer<byte[], byte[]> consumer = cf.createConsumer();
consumer.subscribe(Collections.singleton("so0544out"));
ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);
consumer.commitSync();
assertThat(records.count()).isEqualTo(1);
assertThat(new String(records.iterator().next().value())).isEqualTo("FOO");
}
}
+9
Gary russell
source
to share