Spring kafka integration creating consumers dynamically

I am using Spring-Integration-Kafka and below is a sample for dynamically creating consumers to receive and print messages to the console. Consumer class:

public class Consumer1 {
private static final String CONFIG = "kafkaInboundMDCAdapterParserTests-context.xml";
static ClassPathXmlApplicationContext ctx;

public static void main(final String args[]) {
    ctx = new ClassPathXmlApplicationContext(CONFIG, Consumer1.class);
    ctx.start();
    addConsumer("test19", "default8");

    ctx = new ClassPathXmlApplicationContext(CONFIG, Consumer1.class);
    ctx.start();
    addConsumer("test19", "default10");

}

public static void addConsumer(String topicId, String groupId) {

    MessageChannel inputChannel = ctx.getBean("inputFromKafka", MessageChannel.class);

    ServiceActivatingHandler serviceActivator = new ServiceActivatingHandler(new MessageReceiver(), "processMessage");
    ((SubscribableChannel) inputChannel).subscribe(serviceActivator);

    KafkaConsumerContext<String, String> kafkaConsumerContext = ctx.getBean("consumerContext", KafkaConsumerContext.class);
    try {
        TopicFilterConfiguration topicFilterConfiguration = new TopicFilterConfiguration(topicId, 1, false);

        ConsumerMetadata<String,String> consumerMetadata = new ConsumerMetadata<String, String>();
        consumerMetadata.setGroupId(groupId);
        consumerMetadata.setTopicFilterConfiguration(topicFilterConfiguration);
        consumerMetadata.setConsumerTimeout("1000");
        consumerMetadata.setKeyDecoder(new AvroReflectDatumBackedKafkaDecoder<String>(java.lang.String.class));
        consumerMetadata.setValueDecoder(new AvroReflectDatumBackedKafkaDecoder<String>(java.lang.String.class));


        ZookeeperConnect zkConnect = ctx.getBean("zookeeperConnect", ZookeeperConnect.class);

        ConsumerConfigFactoryBean<String, String> consumer = new ConsumerConfigFactoryBean<String, String>(consumerMetadata,
                zkConnect);

        ConsumerConnectionProvider consumerConnectionProvider = new ConsumerConnectionProvider(consumer.getObject());
        MessageLeftOverTracker<String,String> messageLeftOverTracker = new MessageLeftOverTracker<String, String>();
        ConsumerConfiguration<String, String> consumerConfiguration = new ConsumerConfiguration<String, String>(consumerMetadata, consumerConnectionProvider, messageLeftOverTracker);

        kafkaConsumerContext.getConsumerConfigurations().put(groupId, consumerConfiguration);
    } catch (Exception exp) {
        exp.printStackTrace();
    }
}

      

}

incoming config file:

<int:channel id="inputFromKafka"/>

<int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="localhost:2181"
        zk-connection-timeout="6000"
        zk-session-timeout="6000"
        zk-sync-time="2000"/>

<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
        kafka-consumer-context-ref="consumerContext"
        auto-startup="false"
        channel="inputFromKafka">
    <int:poller fixed-delay="1" time-unit="MILLISECONDS"/>
</int-kafka:inbound-channel-adapter>

<bean id="kafkaReflectionDecoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaDecoder">
    <constructor-arg type="java.lang.Class" value="java.lang.String"/>
</bean>

<int-kafka:consumer-context id="consumerContext"
        consumer-timeout="1000"
        zookeeper-connect="zookeeperConnect">
    <int-kafka:consumer-configurations>
        <int-kafka:consumer-configuration group-id="default1"
                value-decoder="kafkaReflectionDecoder"
                key-decoder="kafkaReflectionDecoder"
                max-messages="5000">
            <int-kafka:topic id="mdc1" streams="1"/>
        </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

      

When I post any message to topic "test19" the configured "processMessage" method of the ServiceActivator displays two messages configured by two clients, but the question is here: I need to load the incoming config file for each client before adding it to the consumer context. I only get one message in my console. Is this correct or do I need to change something?

Thank.

+3


source to share


1 answer


It's not at all clear what you are trying to do, but you have problems.

By running the context before subscribing to your consumer, you might get into trouble (the dispatcher does not have a subscriber inputFromKafka

for the short time between starting and subscribing).



Why are you programmatically creating a service activator rather than declaring it in context?

It would be better to configure everything in context (you can pass properties such as groupId

to the context via properties in the environment and use the property placeholder configurator.

0


source







All Articles