Spring Kafka - Event sourcing - example of how to query any object state using Kafka API + KafkaStreams
I am using Kafka to implement an event lookup architecture.
Suppose I am storing events in JSON format:
{"name": "ProductAdded", "productId":"1", quantity=3, dateAdded="2017-04-04" }
I would like to implement a query to get the quantity of a product with productId = X for a specific date.
Can you show a rough implementation of this request with Spring Kafka KStreams?
UPDATE: I have been doing a bit with this using Spring Kafka KStreams, but I am getting deserialization error.
This is my Spring Kafka Cloud Stream producer :
public interface ProductProducer{
final String OUTPUT = "productsOut";
@Output(ProductProducer.OUTPUT)
MessageChannel output();
}
Config:
spring:
application:
name: product-generator-service
cloud:
stream:
kafka:
binder:
brokers:
- kafka
zk-nodes:
- kafka
bindings:
productsOut:
producer:
sync: true
bindings:
productsOut:
destination: orders
content-type: application/json
I am posting using the following code, which correctly serializes the map to a JSON object:
Map<String, Object> event = new HashMap<>();
event.put("name", "ProductCreated");
event.put("productId", product.getId());
event.put("quantity", product.getQuantity());
event.put("dateAdded", new Date());
productProducer.output().send(MessageBuilder.withPayload(event).build(), 500);
MessageBuilder.withPayload(event).build()
β GenericMessage [payload={quantity=1, productId=1, name=ProductCreated, dateAdded="xxxxx"}, headers={id=fc531176-e3e9-61b8-40e3-08074fabee4d, timestamp=1499845483095}]
In the ProductService application, I can read this message using the Spring Cloud Stream listener :
@Component
public class ProductListener{
@StreamListener(ProductConsumer.INPUT)
public void handleProduct(Map<String, Object> event){
However with KStream I am getting deserialization error:
@Configuration
public class KStreamsConfig {
private static final String STREAMING_TOPIC1 = "orders";
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-service-kstream");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
//props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(jsonSerializer, jsonDeserializer).getClass().getName());
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new StreamsConfig(props);
}
@Bean
public FactoryBean<KStreamBuilder> myKStreamBuilder(StreamsConfig streamsConfig) {
return new KStreamBuilderFactoryBean(streamsConfig);
}
@Bean
public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder) {
Serde<Integer> integerSerde = Serdes.Integer();
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStream<Integer, JsonNode> stream = kStreamBuilder.stream(null, integerSerde, jsonSerde, STREAMING_TOPIC1);
stream.print();
return stream;
}
}
Exception
org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ΓΏ': was expecting ('true', 'false' or 'null')
at [Source: [B@288e4e9a; line: 1, column: 4]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ΓΏ': was expecting ('true', 'false' or 'null')
at [Source: [B@288e4e9a; line: 1, column: 4]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3528)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:30)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:46)
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:158)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:605)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
UPDATE 2:
To see what was getting into the KStream I went into String deserializers for both key and value and this is what gets printed:
KStream<Integer, String> stream = kStreamBuilder.stream(null, integerSerde, stringSerde, STREAMING_TOPIC1);
Print value:
[KSTREAM-SOURCE-0000000000]: null , contentType
Why am I not getting the JSON string?
UPDATE 3: I fixed the deserialization issue, the reason is that the message producer (Spring Cloud Stream) by default adds some headers as part of the payload. I needed to disable the inclusion of this header in order to properly receive messages on Kafka streams:
spring:
application:
name: product-service
cloud:
stream:
kafka:
binder:
brokers:
- kafka
zk-nodes:
- kafka
bindings:
productsOut:
producer:
sync: true
bindings:
productsIn:
group: product-service
destination: orders
consumer:
max-attempts: 5
header-mode: raw
productsOut:
destination: orders
content-type: application/json
producer:
header-mode: raw
Definition of KStream:
KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC1);
Output:
[KSTREAM-SOURCE-0000000000]: null , {"quantity":0,"productId":0,"name":"ProductCreated","dateAdded":1499930385450}
Now that everything is set up correctly: how can I implement an interactive query like the one I need? β Get the quantity of a product with productId = X for a specific date
source to share
I managed to solve this problem by using a mixture of Spring Cloud Streams (for generating messages) and Spring Kafka to handle KafkaStreams and implement interactive requests ( IMPORTANT : Please note the question UPDATE 3: be able to combine both):
Kafka thread configuration :
@Configuration
public class KStreamsConfig {
private static final String STREAMING_TOPIC1 = "orders";
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-service-streams");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
//props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(jsonSerializer, jsonDeserializer).getClass().getName());
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new StreamsConfig(props);
}
@Bean
public KStreamBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) {
return new KStreamBuilderFactoryBean(streamsConfig);
}
@Bean
public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder, KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
Serde<Integer> integerSerde = Serdes.Integer();
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC1);
stream.map( (key, value) -> {
return new KeyValue<>(value.get("productId").asInt(), value.get("quantity").asInt());
}).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");
stream.print();
return stream;
}
}
Notice how I am creating a KTable storage ProductsStock
which I will query later in the service.
ProductService
@Autowired
private KStreamBuilderFactoryBean kStreamBuilderFactoryBean;
@Override
public Integer getProductStock(Integer id) {
KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
streams.store("ProductsStock", QueryableStoreTypes.keyValueStore());
return keyValueStore.get(id);
}
source to share
The upcoming version 1.3.0.M1 for spring kafka binding cloud stream will support kstream binding. There is a PR where you can track the progress of this initiative.
Here is a more general example (WordCount) using the KStream binder: WordCount example using spring Cloud Stream support for Kafka streams
With this, you can achieve what you are looking for in the following way.
This StreamListener method will listen from the Kafka topic and write to another topic with a score for products with ID equal to 123 in a time window for the last 30 seconds.
@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class ProductCountApplication {
public static final int = 123;
@StreamListener("input")
@SendTo("output")
public KStream<?, String> process(KStream<?, Product> input) {
return input
.filter((key, product) -> product.getID() == PRODUCT_ID)
.map((k,v) -> new KeyValue<>(v, v))
.groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))
.count(TimeWindows.of(30000), "product-store")
.toStream()
.map((w,c) -> new KeyValue<>(null, "Product with id 123 count: " + c));
}
}
This is where application.yml is used:
spring.cloud.stream.kstream.binder.streamConfiguration:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde # Use a native Kafka Serde for the key
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde # Use a native Kafka Serde for the value
spring.cloud.stream.bindings.output.producer:
headerMode: raw # Incoming data has no embedded headers
useNativeEncoding: true # Write data using the native Serde
spring.cloud.stream.bindings.input.consumer:
headerMode: raw # Outbound data has no embedded headers
When you run the program, you need to pass the I / O destination (topics) addresses:
--spring.cloud.stream.bindings.input.destination=products
--spring.cloud.stream.bindings.output.destination=counts
source to share