Spring Kafka - Event sourcing - example of how to query any object state using Kafka API + KafkaStreams

I am using Kafka to implement an event lookup architecture.

Suppose I am storing events in JSON format:

{"name": "ProductAdded", "productId":"1", quantity=3, dateAdded="2017-04-04" }

      

I would like to implement a query to get the quantity of a product with productId = X for a specific date.

Can you show a rough implementation of this request with Spring Kafka KStreams?

UPDATE: I have been doing a bit with this using Spring Kafka KStreams, but I am getting deserialization error.

This is my Spring Kafka Cloud Stream producer :

public interface ProductProducer{

    final String OUTPUT = "productsOut";

    @Output(ProductProducer.OUTPUT)
    MessageChannel output();

}

      

Config:

spring:
  application:
    name: product-generator-service
  cloud:
    stream:
      kafka:
        binder:
          brokers:
          - kafka
          zk-nodes:
          - kafka
        bindings:
          productsOut:
            producer:
              sync: true
      bindings:
        productsOut: 
          destination: orders
          content-type: application/json

      

I am posting using the following code, which correctly serializes the map to a JSON object:

Map<String, Object> event = new HashMap<>();
event.put("name", "ProductCreated");
event.put("productId", product.getId());
event.put("quantity", product.getQuantity());
event.put("dateAdded", new Date());
        productProducer.output().send(MessageBuilder.withPayload(event).build(), 500);

      

MessageBuilder.withPayload(event).build()

β†’ GenericMessage [payload={quantity=1, productId=1, name=ProductCreated, dateAdded="xxxxx"}, headers={id=fc531176-e3e9-61b8-40e3-08074fabee4d, timestamp=1499845483095}]

In the ProductService application, I can read this message using the Spring Cloud Stream listener :

@Component
public class ProductListener{

    @StreamListener(ProductConsumer.INPUT)
    public void handleProduct(Map<String, Object> event){

      

However with KStream I am getting deserialization error:

@Configuration
public class KStreamsConfig {

    private static final String STREAMING_TOPIC1 = "orders";

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-service-kstream");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        //props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(jsonSerializer, jsonDeserializer).getClass().getName());
        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new StreamsConfig(props);
    }

    @Bean
    public FactoryBean<KStreamBuilder> myKStreamBuilder(StreamsConfig streamsConfig) {
        return new KStreamBuilderFactoryBean(streamsConfig);
    }

    @Bean
    public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder) {

        Serde<Integer> integerSerde = Serdes.Integer();
        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

        KStream<Integer, JsonNode> stream = kStreamBuilder.stream(null, integerSerde, jsonSerde, STREAMING_TOPIC1);
        stream.print();
        return stream;
    }

}

      

Exception

org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ΓΏ': was expecting ('true', 'false' or 'null')
 at [Source: [B@288e4e9a; line: 1, column: 4]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ΓΏ': was expecting ('true', 'false' or 'null')
 at [Source: [B@288e4e9a; line: 1, column: 4]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3528)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:30)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:46)
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:158)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:605)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

      

UPDATE 2:

To see what was getting into the KStream I went into String deserializers for both key and value and this is what gets printed:

KStream<Integer, String> stream = kStreamBuilder.stream(null, integerSerde, stringSerde, STREAMING_TOPIC1);

      

Print value:

[KSTREAM-SOURCE-0000000000]: null ,  contentType

      

Why am I not getting the JSON string?

UPDATE 3: I fixed the deserialization issue, the reason is that the message producer (Spring Cloud Stream) by default adds some headers as part of the payload. I needed to disable the inclusion of this header in order to properly receive messages on Kafka streams:

spring:
  application:
    name: product-service
  cloud:
    stream:
      kafka:
        binder:
          brokers:
          - kafka
          zk-nodes:
          - kafka
        bindings:
          productsOut:
            producer:
              sync: true
      bindings:
        productsIn:
          group: product-service 
          destination: orders
          consumer:
            max-attempts: 5
            header-mode: raw
        productsOut: 
          destination: orders
          content-type: application/json
          producer:
            header-mode: raw

      

Definition of KStream:

KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC1);

      

Output:

[KSTREAM-SOURCE-0000000000]: null , {"quantity":0,"productId":0,"name":"ProductCreated","dateAdded":1499930385450}

      

Now that everything is set up correctly: how can I implement an interactive query like the one I need? β†’ Get the quantity of a product with productId = X for a specific date

+3


source to share


2 answers


I managed to solve this problem by using a mixture of Spring Cloud Streams (for generating messages) and Spring Kafka to handle KafkaStreams and implement interactive requests ( IMPORTANT : Please note the question UPDATE 3: be able to combine both):

Kafka thread configuration :

@Configuration
public class KStreamsConfig {

    private static final String STREAMING_TOPIC1 = "orders";

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-service-streams");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        //props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(jsonSerializer, jsonDeserializer).getClass().getName());
        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new StreamsConfig(props);
    }

    @Bean
    public KStreamBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) {
        return new KStreamBuilderFactoryBean(streamsConfig);
    }

    @Bean
    public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder, KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {

        Serde<Integer> integerSerde = Serdes.Integer();
        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

        KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC1);

        stream.map( (key, value) -> {
            return new KeyValue<>(value.get("productId").asInt(), value.get("quantity").asInt());
        }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");

        stream.print();
        return stream;
    }

}

      



Notice how I am creating a KTable storage ProductsStock

which I will query later in the service.

ProductService

@Autowired
private KStreamBuilderFactoryBean kStreamBuilderFactoryBean;

@Override
    public Integer getProductStock(Integer id) {
        KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
        ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
        streams.store("ProductsStock", QueryableStoreTypes.keyValueStore());
        return keyValueStore.get(id);
}

      

+2


source


The upcoming version 1.3.0.M1 for spring kafka binding cloud stream will support kstream binding. There is a PR where you can track the progress of this initiative.

Here is a more general example (WordCount) using the KStream binder: WordCount example using spring Cloud Stream support for Kafka streams

With this, you can achieve what you are looking for in the following way.

This StreamListener method will listen from the Kafka topic and write to another topic with a score for products with ID equal to 123 in a time window for the last 30 seconds.

@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class ProductCountApplication {

  public static final int = 123;

  @StreamListener("input")
  @SendTo("output")
  public KStream<?, String> process(KStream<?, Product> input) {

        return input
                .filter((key, product) -> product.getID() == PRODUCT_ID)
                .map((k,v) -> new KeyValue<>(v, v))
                .groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))
                .count(TimeWindows.of(30000), "product-store")
                .toStream()
                .map((w,c) -> new KeyValue<>(null, "Product with id 123 count: " + c));
  }

}

      



This is where application.yml is used:

spring.cloud.stream.kstream.binder.streamConfiguration:
  key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde # Use a native Kafka Serde for the key
  value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde # Use a native Kafka Serde for the value
spring.cloud.stream.bindings.output.producer:
  headerMode: raw # Incoming data has no embedded headers
  useNativeEncoding: true # Write data using the native Serde
spring.cloud.stream.bindings.input.consumer:
  headerMode: raw # Outbound data has no embedded headers

      

When you run the program, you need to pass the I / O destination (topics) addresses:

--spring.cloud.stream.bindings.input.destination=products 
--spring.cloud.stream.bindings.output.destination=counts

      

0


source







All Articles