Generic processing in Avro encoder / decoder

I am using spring-intergration-kafka

to send and receive messages from kafka. The message object is based on a generic type. The base class looks like this

public abstract class AbstractMessage<T> implements Serializable {
 ...
}

      

I would like to be able to send and receive various implementations of this class. I am using avro based encoders and decoders

<bean id="kafkaMessageEncoder"
    class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
    <constructor-arg value="com.....model.AbstractMessage" />
</bean>

<bean id="kafkaMessageDecoder"
    class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaDecoder">
    <constructor-arg value="com.....model.AbstractMessage" />
</bean>

      

This fails with error

AvroTypeException: Unknown Type: T

This makes sense since Avro cannot define the generic type specified in the AbstractMessage class, but then I decided to use my own encoder and decoder.

public class MessageEncoder implements Encoder<Object> {

@Override
public byte[] toBytes(Object object) {
    try {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(baos);
        oos.writeObject(object);
        oos.flush();
        oos.close();
        return baos.toByteArray();
    } catch (Exception e) {
        e.printStackTrace();
    }
    return "".getBytes();
}

}

public class MessageDecoder implements Decoder<Object> {

@Override
public Object fromBytes(byte[] bs) {
    try {
        ObjectInputStream bais = new ObjectInputStream(new ByteArrayInputStream(bs));
        AbstractMessage<?> message = (AbstractMessage<?>) bais.readObject();
        return message;
    } catch (Exception e) {
        e.printStackTrace();
    }
    return null;
}
}

      

And it works great.

I guess my question is that I am not doing anything special here. Why can't avro do the same and there is another avro d / encoder that can serialize / deserialize objects of my generic type.

thank

+3


source to share





All Articles