Using Serializable lambda in Spark JavaRDD transform

I am trying to understand the following code.

// File: LambdaTest.java

package test;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;

public class LambdaTest implements Ops {

  public static void main(String[] args) {
    new LambdaTest().job();
  }

  public void job() {
    SparkConf conf = new SparkConf()
      .setAppName(LambdaTest.class.getName())
      .setMaster("local[*]");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    List<Integer>              lst  = Arrays.asList(1, 2, 3, 4, 5, 6);
    JavaRDD<Integer>           rdd  = jsc.parallelize(lst);
    Function<Integer, Integer> func1 = (Function<Integer, Integer> & Serializable) x -> x * x;
    Function<Integer, Integer> func2 = x -> x * x;

    System.out.println(func1.getClass());  //test.LambdaTest$$Lambda$8/390374517
    System.out.println(func2.getClass());  //test.LambdaTest$$Lambda$9/208350681

    this.doSomething(rdd, func1);  // works
    this.doSomething(rdd, func2);  // org.apache.spark.SparkException: Task not serializable
  }
}

      

// File: Ops.java

package test;

import org.apache.spark.api.java.JavaRDD;
import java.util.function.Function;    

public interface Ops {

  default void doSomething(JavaRDD<Integer> rdd, Function<Integer, Integer> func) {
    rdd.map(x -> x + func.apply(x))
       .collect()
       .forEach(System.out::println);
  }

}

      

The difference func1

is done with anchor Serializable

, and func2

not.

When looking at the runtime class of two functions, they are anonymous class in LambdaTest

class

They are both used in the RDD conversion in the interface, then the two functions LambdaTest

must be serialized.

As you can see, it LambdaTest

doesn't implement the interface Serializable

. Therefore, I think that the two functions should not work. But it's amazing what func1

works.

The stack trace for the func2

following:

Serialization stack:
    - object not serializable (class: test.LambdaTest$$Lambda$9/208350681, value: test.LambdaTest$$Lambda$9/208350681@61d84e08)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=interface fr.leboncoin.etl.jobs.test.Ops, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic fr/leboncoin/etl/jobs/test/Ops.lambda$doSomething$1024e30a$1:(Ljava/util/function/Function;Ljava/lang/Integer;)Ljava/lang/Integer;, instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;, numCaptured=1])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class fr.leboncoin.etl.jobs.test.Ops$$Lambda$10/1470295349, fr.leboncoin.etl.jobs.test.Ops$$Lambda$10/1470295349@4e1459ea)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
    ... 19 more

      

It seems that if the function is linked to Serializable

, the object containing it doesn't have to be serialized, which confuses me.

Any explanation for this is much appreciated.

------------------------------ Updates ------------- ------ -----------

I tried using an abstract class instead of an interface:

// File: AbstractTest.java

public class AbstractTest {

  public static void main(String[] args) {
    new AbstractTest().job();
  }

  public void job() {
    SparkConf conf = new SparkConf()
      .setAppName(AbstractTest.class.getName())
      .setMaster("local[*]");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    List<Integer>    lst = Arrays.asList(1, 2, 3, 4, 5, 6);
    JavaRDD<Integer> rdd = jsc.parallelize(lst);

    Ops ops = new Ops() {

      @Override
      public Integer apply(Integer x) {
        return x + 1;
      }
    };

    System.out.println(ops.getClass()); // class fr.leboncoin.etl.jobs.test.AbstractTest$1
    ops.doSomething(rdd);
  }
}

      

// File: Ops.java

public abstract class Ops implements Serializable{

  public abstract Integer apply(Integer x);

  public void doSomething(JavaRDD<Integer> rdd) {
    rdd.map(x -> x + apply(x))
       .collect()
       .forEach(System.out::println);
  }
}

      

It doesn't work even if the class is Ops

compiled in separate files with the class AbstractTest

. Object class name Ops

class fr.leboncoin.etl.jobs.test.AbstractTest$1

. According to the next stack track, it seems like it AbstractTest$1

needs to be serialized in order to serialize AbstractTest

.

Serialization stack:
    - object not serializable (class: test.AbstractTest, value: test.AbstractTest@21ac5eb4)
    - field (class: test.AbstractTest$1, name: this$0, type: class test.AbstractTest)
    - object (class test.AbstractTest$1, test.AbstractTest$1@36fc05ff)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class fr.leboncoin.etl.jobs.test.Ops, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial fr/leboncoin/etl/jobs/test/Ops.lambda$doSomething$6d6228b6$1:(Ljava/lang/Integer;)Ljava/lang/Integer;, instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;, numCaptured=1])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class fr.leboncoin.etl.jobs.test.Ops$$Lambda$8/208350681, fr.leboncoin.etl.jobs.test.Ops$$Lambda$8/208350681@4acb2510)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
    ... 19 more

      

+3


source to share


1 answer


LambdaTest

not necessary Serializable

, since it is not sent over the wire - there is no reason for that.

On the other hand, the two func1

and func1

should be Serializable

as Spark will use them to perform calculations (in the RDD , so this code should be Note that even if you're all in the same class, after compiling your lambdas will be placed in separate files so that the whole class doesn't have to be wired -> there shouldn't be an outer class Serializable

.



As for why it fun1

works when you are not using a cast, the Java compiler will infer the type of the lambda expression for you. So, in this case, the code generated for fun2

would just implement a Function

(since that is the type of the target variable). On the other hand, if the type cannot be inferred from the context (as in your case, the compiler doesn't know what it fun1

should be Serializable

, since it is a function required by Spark), you can use type casting as in your example to explicitly specify the type ... In this case, the code generated by the compiler will implement both interfaces Function

and Serializable

, and the compiler will not try to inference the type to its own.

You can find the lambda state in section 5. Contexts for entering assignments .

+2


source







All Articles