Kafka connect - audit - fire event when tasks are completed

We are creating an exception management tool with kafka. There will be a source connector that will pull records from the physical file. On the other hand, a sink (mongodb-sinkconnect) will be connected, which will pull the entries from the topic and click on mongoDb. Everything works fine.

We need to capture the event (for audit purposes) in a different topic. Events such as

  • The original task (file polling task) start of the event For example, if file A received
  • Initial task (file polling task) end event Example, if file A is fully processed
  • Sink task (pushing records into mongodb task) start event Example: file A records start with mongodb-connect
  • Sink task (pushing records to mongodb task) end event Example: A files are completely migrated to MongoDB

I have a couple of questions: 1. We can dispatch events to different topics by creating an instance of KafkaProducer inside SourceTask, and as soon as the file is fully processed, we dispatch the event

public class FileSourceTask extends SourceTask {
    private Producer<Key, Event> auditProducer;

    public void start(Map<String, String> props) {
       auditProducer = new KafkaProducer<Key, Event>(auditProps);
    }

    public List<SourceRecord> poll() {
        List<SourceRecord> results = this.filePoller.poll();
        if(results.isEmpty() && eventNotSentForCurrentFile) {
          Event event = new Event();
          auditProducer.send(
          new ProducerRecord<Key, Event>(this.props.get("event.topic"), key,                event));

        }
       // futher processing  
     }

      

Is the above coorect approach?

  1. The above solution works great - since it is done with one task (maxTasks = 1), but for our use case, doing this task is very difficult in the connection task (mongoDB connection). Since this section is split, many tasks will be created. We cannot keep track of the start event and end event of the sink task.

Please suggest an approach to solve this problem.

Thank you very much.

+3


source to share


1 answer


I think you can build something around the Kafka-connect ReST API

https://docs.confluent.io/current/connect/restapi.html#get--connectors-(string-name)-status



However, you need to monitor the status of the connector and after completing all tasks for the connector, you can take this action.

0


source







All Articles