How to implement a Kafka consumer in a Spring MVC web application (using Spring Boot)

I would like to create a user inside a Spring MVC web application. Basically, I would like the web application to listen on some topics on Kafka and take some action based on the messages received.

All examples I've seen so far use either a standalone infinite loop application (using plain java) or inside a unit test (using spring):

@Autowired
private Listener listener;

@Autowired
private KafkaTemplate<Integer, String> template;

@Test
public void testSimple() throws Exception {
    template.send("annotated1", 0, "foo");
    template.flush();
    assertTrue(this.listener.latch1.await(10, TimeUnit.SECONDS));
}

@Configuration
@EnableKafka
public class Config {

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
                    kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
    ...
    return props;
}

@Bean
public Listener listener() {
    return new Listener();
}

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
    ...
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

}
public class Listener {

    private final CountDownLatch latch1 = new CountDownLatch(1);

    @KafkaListener(id = "foo", topics = "annotated1")
        public void listen1(String foo) {
            this.latch1.countDown();
     }

}

      

What would be the best place to create a listener? Should I specify it:

@SpringBootApplication
public class Application {

@Autowired
private Listener listener;

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

      

}

+3
java spring spring-boot model-view-controller apache-kafka


source to share


No one has answered this question yet

Check out similar questions:

708
How to configure port for Spring Boot application
531
How should one structure a model in MVC?
278
How to register SQL statements in Spring Boot?
five
Spring Security OAuth2 Redirect Loop
five
How to deploy spring bootable MVC app in traditional tomcat webapps folder?
3
Spring boot security consider case insensitive username check for login
2
Autowired bean is null in MVC Controller
1
Spring MVC form validation not working for nested complex types
1
Do spring beans that depend on other beans get single versions of those beans by default?
0
Spring security cannot login after invalid credentials



All Articles
Loading...
X
Show
Funny
Dev
Pics