How to send and return data from Kafka in one API call

I would like to send and return data from Kafka in one API call (see diagram below).

enter image description here

Is it possible? I already know how to render data in one direction (e.g. Spark Streaming reads data using a custom Kafka API). I also know how to sort the "fake" by doing two one-way approaches (eg the web application is both producer and consumer). However, when the web app makes an API call, I want it to deal with its own entry, not all entries in this thread, so this seems like the wrong approach.

Other suboptimal approaches I thought of:

  • Store the Spark Streaming result in the database so that the web application can constantly try the database until the result appears. I am worried that this might consume a lot of resources and delay response times.
  • Create short-lived / transient consumers every time I call the Kafka manufacturer. The transient consumer will filter all records except the ones they are looking for. When it finds the record it is looking for, the temporary consumer is disconnected. I don't think this will work, because the API user-invoked entry might go to a different section and therefore it will never be found.
  • Create a temporary topic for each web application API call. I'm not sure if Kafka will complain about too many topics.

Any advice?

+3


source to share


2 answers


What I did was ....

  • Create a synProducer that sends data using a key and creates a user for a topic that has a name as the key of the sent message.
  • The synConsumer then processes the message and replies to the topic where the consumer waits in step 1.
  • Delete temporary topic


The disadvantage of this approach is that problems are not immediately removed.

+2


source


I suggest you the third one, but with 2 topics: 1 for the request and 1 for the response. This is an example:



import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class ConsumerGroupExample extends Thread {

    private final ConsumerConnector consumer;
    private final String topic;
    private ConsumerIterator<byte[], byte[]> it;
    private String mensaje="";

    public ConsumerGroupExample(Properties props, String a_topic) 
    { 
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        this.topic = a_topic;

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        KafkaStream stream = streams.get(0); 
        it = stream.iterator();
    }

    public void shutdown()
    {
        if (consumer != null) consumer.shutdown();
    }

    public void run()
    {
        if (it.hasNext())
        {
            mensaje = new String(it.next().message());
        }
        System.out.println( mensaje );
    }

    public String getMensaje()
    {
        return this.mensaje;
    }

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "Group");
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("consumer.timeout.ms", "10000");

        ConsumerGroupExample example = new ConsumerGroupExample( props, "topicFoRResponse");

        props = new Properties();
        props.put("metadata.broker.list", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);


        example.start();
        try {

            Producer<String, String> colaParaEscritura;
            KeyedMessage<String, String> data = new KeyedMessage<String, String>("topicForRequest", " message ");
            colaParaEscritura = new kafka.javaapi.producer.Producer<String, String>(config);
            colaParaEscritura.send(data);
            System.out.println("enviado");
            colaParaEscritura.close();

            example.join();

            System.out.println( "final"+ example.getMensaje() );

        } 
        catch (InterruptedException ie) {

        }
        example.shutdown();
    }

}

      

0


source







All Articles