How to create a unit test using kafka built in spring cloud flow
1 answer
We usually recommend using Test Binder in your tests, but if you want to use the built-in kafka server this can be done ...
Add this to your POM ...
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
Test Application ...
@SpringBootApplication
@EnableBinding(Processor.class)
public class So43330544Application {
public static void main(String[] args) {
SpringApplication.run(So43330544Application.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public byte[] handle(byte[] in){
return new String(in).toUpperCase().getBytes();
}
}
application.properties ...
spring.cloud.stream.bindings.output.destination=so0544out
spring.cloud.stream.bindings.input.destination=so0544in
spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.input.group=so0544
Test version ...
@RunWith(SpringRunner.class)
@SpringBootTest
public class So43330544ApplicationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);
@Autowired
private KafkaTemplate<byte[], byte[]> template;
@Autowired
private KafkaProperties properties;
@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}
@Test
public void testSendReceive() {
template.send("so0544in", "foo".getBytes());
Map<String, Object> configs = properties.buildConsumerProperties();
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test0544");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(configs);
Consumer<byte[], byte[]> consumer = cf.createConsumer();
consumer.subscribe(Collections.singleton("so0544out"));
ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);
consumer.commitSync();
assertThat(records.count()).isEqualTo(1);
assertThat(new String(records.iterator().next().value())).isEqualTo("FOO");
}
}
+9
source to share