How to use ValueMapper to change value type in Kafka 10.2 streams with Scala

I am trying to upgrade my Scala Kafka Streams based application from 0.10.0.0 to 0.10.2.1 and I cannot figure out how to get the application to compile.

The example I found in the documentation is used mapValue

, but it doesn't change the type of the values. I am using Scala 2.11 with the compiler flag as -Xexperimental

per this .

code

class MyStream() {
  def startMyStream(): Unit = {
    val kStreamBuilder = new KStreamBuilder
    val kStream = kStreamBuilder.stream("myTopic")

    kStream.mapValues(new ValueMapper[AnyRef, Double]() {
      override def apply(value: Any) = 6.3
    })

    val kafkaStreams = new KafkaStreams(kStreamBuilder,  new Properties)
    kafkaStreams.start()
  }
}

      

Compiler error

no type parameters for method mapValues: (x$1: org.apache.kafka.streams.kstream.ValueMapper[_, _ <: VR])org.apache.kafka.streams.kstream.KStream[Nothing,VR] exist so that it can be applied to arguments (org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]{})
 --- because ---
argument expression type is not compatible with formal parameter type;
  found   : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]
  required: org.apache.kafka.streams.kstream.ValueMapper[_, _ <: ?VR]
Note: AnyRef <: Any, but Java-defined trait ValueMapper is invariant in type V.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
    kStream.mapValues(new ValueMapper[AnyRef, Double]() {
            ^
type mismatch;
  found   : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]{}
  required: org.apache.kafka.streams.kstream.ValueMapper[_, _ <: VR]
    kStream.mapValues(new ValueMapper[AnyRef, Double]() {
                      ^
two errors found

      

Expressed as a java class, this one compiles just fine.

public class MyStream {
    public void startMyStream() {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KStream kStream = kStreamBuilder.stream("myTopic");

        kStream.mapValues(new ValueMapper<Object, Double>() {
            @Override
            public Double apply(Object value) {
                return 6.3;
            }
        });

        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, new Properties());
        kafkaStreams.start();
    }
}

      

How do I compile the Scala version?


Solution based on this answer

Three ways to work and two that don't.

class MyStream() {
  def startMyStream(): Unit = {
    val kStreamBuilder = new KStreamBuilder

    // Explicit tying here is not required to compile and run. 
    val kStream: KStream[Array[Byte], String] = kStreamBuilder.stream("myTopic")

    // Does not compile
    kStream.mapValues(new ValueMapper[AnyRef, Double]() {
      override def apply(value: AnyRef) = 6.3
    })

    // Does not compile
    kStream.mapValues(_ => 6.3)

    // Works
    kStream.mapValues[Double](new ValueMapper[AnyRef, Double]() {
      override def apply(value: AnyRef) = 6.3
    })

    // Works, requires compiler option -Xexperimental
    kStream.mapValues[Double](_ => 6.3)

    // Works, requires compiler option -Xexperimental
    kStream.mapValues[Double](convert)
    def convert(string: String): Double = 6.3

    val kafkaStreams = new KafkaStreams(kStreamBuilder, new Properties)
    kafkaStreams.start()
  }
}

      


Update: solutions that don't work

Attempt 1 Adding explicit types to kStream as suggested by ( val kStream: KStream[Array[Byte], String] = kStreamBuilder.stream("myTopic")

) still fails to compile and results in this error.

no type parameters for method mapValues: (x$1: org.apache.kafka.streams.kstream.ValueMapper[_ >: String, _ <: VR])org.apache.kafka.streams.kstream.KStream[Array[Byte],VR] exist so that it can be applied to arguments (org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]{})
--- because ---
argument expression type is not compatible with formal parameter type;
  found   : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]
  required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String, _ <: ?VR]
Note: AnyRef <: Any, but Java-defined trait ValueMapper is invariant in type V.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
kStream.mapValues(new ValueMapper[AnyRef, Double]() {
  ^
  type mismatch;
  found   : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]{}
  required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String, _ <: VR]
  kStream.mapValues(new ValueMapper[AnyRef, Double]() {

      

Attempt 2 Adding the above and using SAM transforms for "avoid writing an anonymous class instance explicitly" ( kStream.mapValues(_ => 6.3)

) resulted in this compiler's error.

no type parameters for method mapValues: (x$1: org.apache.kafka.streams.kstream.ValueMapper[_ >: String, _ <: VR])org.apache.kafka.streams.kstream.KStream[Array[Byte],VR] exist so that it can be applied to arguments (org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable)
--- because ---
argument expression type is not compatible with formal parameter type;
  found   : org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable
  required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String, _ <: ?VR]
Note: String <: Any (and org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable <: org.apache.kafka.streams.kstream.ValueMapper[String,Double]), but Java-defined trait ValueMapper is invariant in type V.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
kStream.mapValues(_ => 6.3)
        ^
type mismatch;
found   : org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable
required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String, _ <: VR]
kStream.mapValues(_ => 6.3)
                    ^

      

+3


source to share


1 answer


There are two problems here. First, you don't specify the type of stream:

val kStream: KStream[Array[Byte], String] = kStreamBuilder.stream("myTopic")

      

This is actually the reason for the error you are seeing - since you are not specifying types, Scala infers them to some default values, which is almost always not what you want.

Second, since you've included -Xexperimental

, you can rely on SAM conversions to avoid explicitly writing an instance of the anonymous class:



kStream.mapValues(_ => 6.3)

      

Update: It seems that for some reason the Scala compiler is unable to correctly specify the output type of the anonymous function / SAM instance here. I was able to compile the code with the following little tweak:

kStream.mapValues[Double](_ => 6.3)

      

+5


source







All Articles