Spring Boot Kafka: Unable to start user due to NoSuchBeanDefinitionException
Using spring boot service when trying to start kafka consumer seeing NoSuchBeanDefinitionException and was unable to start the service on its own.
Below is my bean class which has all the required beans created for Kafka configuration
Spring Boot Version: 1.5.2.RELEASE
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import com.ns.kafka.gateway.dtos.GatewayCallBackMessage;
@Configuration
@EnableKafka
public class GatewayCallbackToPNConsumerConfig {
@Bean
public Map < String, Object > consumerProps() {
Map < String, Object > props = new HashMap < > ();
props.put(null, null);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "gatewaycallbacktopngroup");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
return props;
}
@Bean
public Deserializer < String > stringKeyDeserializer() {
return new StringDeserializer();
}
@Bean
public Deserializer < GatewayCallBackMessage > gatewayCallBackMessageJsonValueDeserializer() {
return new JsonDeserializer < GatewayCallBackMessage > (GatewayCallBackMessage.class);
}
@Bean
public ConsumerFactory < String, GatewayCallBackMessage > consumerFactory() {
return new DefaultKafkaConsumerFactory < > (consumerProps(),
stringKeyDeserializer(), gatewayCallBackMessageJsonValueDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory < String, GatewayCallBackMessage > kafkaListenerContainerFactory1() {
ConcurrentKafkaListenerContainerFactory < String, GatewayCallBackMessage > factory = new ConcurrentKafkaListenerContainerFactory < > ();
factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Here is the trace of the exception:
org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ConsumerFactory<java.lang.Object, java.lang.Object>' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
at org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1486) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1104) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1066) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:835) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:741) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:467) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1173) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1067) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:513) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:761) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:866) ~[spring-context-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:542) ~[spring-context-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.2.RELEASE.jar:1.5.2.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:737) ~[spring-boot-1.5.2.RELEASE.jar:1.5.2.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:370) ~[spring-boot-1.5.2.RELEASE.jar:1.5.2.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) ~[spring-boot-1.5.2.RELEASE.jar:1.5.2.RELEASE]
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:134) [spring-boot-1.5.2.RELEASE.jar:1.5.2.RELEASE]
at com.ns.services.pn.main.PushNotificationServiceLauncher.main(PushNotificationServiceLauncher.java:28) [bin/:na]
2017-03-24 00:27:04.621 ERROR 51773 --- [ main] o.s.b.d.LoggingFailureAnalysisReporter :
***************************
APPLICATION FAILED TO START
***************************
Description:
Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
- Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found bean 'consumerFactory'
Action:
Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.
source to share
Change the kafkaListenerContainerFactory1
bean name to kafkaListenerContainerFactory
.
The auto-configuration factory looks for ConsumerFactory<Object, Object>
one that doesn't match yours, and
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
Whereas a consumer factory is ...
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
Or disable automatic Kafka configuration.
source to share
1, KafkaListener configuration
@Configuration
@EnableKafka
public class KafkaListenerConfiguration {
@Autowired
private Environment environment;
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("spring.kafka.bootstrap-servers"));
props.put(ConsumerConfig.GROUP_ID_CONFIG, "1111");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
}
2, setting up KafkaListener
@KafkaListener(topics = {KafkaTopicChannel.INPUT_CHANNEL}, containerFactory = "kafkaListenerContainerFactory")
public void listen(List<ConsumerRecord<String, String>> list) {
logger.debug(String.valueOf(list.size()));
}
source to share