Generic processing in Avro encoder / decoder
I am using spring-intergration-kafka
to send and receive messages from kafka. The message object is based on a generic type. The base class looks like this
public abstract class AbstractMessage<T> implements Serializable {
...
}
I would like to be able to send and receive various implementations of this class. I am using avro based encoders and decoders
<bean id="kafkaMessageEncoder"
class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
<constructor-arg value="com.....model.AbstractMessage" />
</bean>
<bean id="kafkaMessageDecoder"
class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaDecoder">
<constructor-arg value="com.....model.AbstractMessage" />
</bean>
This fails with error
AvroTypeException: Unknown Type: T
This makes sense since Avro cannot define the generic type specified in the AbstractMessage class, but then I decided to use my own encoder and decoder.
public class MessageEncoder implements Encoder<Object> {
@Override
public byte[] toBytes(Object object) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(object);
oos.flush();
oos.close();
return baos.toByteArray();
} catch (Exception e) {
e.printStackTrace();
}
return "".getBytes();
}
}
public class MessageDecoder implements Decoder<Object> {
@Override
public Object fromBytes(byte[] bs) {
try {
ObjectInputStream bais = new ObjectInputStream(new ByteArrayInputStream(bs));
AbstractMessage<?> message = (AbstractMessage<?>) bais.readObject();
return message;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
And it works great.
I guess my question is that I am not doing anything special here. Why can't avro do the same and there is another avro d / encoder that can serialize / deserialize objects of my generic type.
thank
source to share
No one has answered this question yet
Check out similar questions: