Task Not Serializable in Spark

I have a conversion like this:

JavaRDD<Tuple2<String, Long>> mappedRdd = myRDD.values().map(
    new Function<Pageview, Tuple2<String, Long>>() {
      @Override
      public Tuple2<String, Long> call(Pageview pageview) throws Exception {
        String key = pageview.getUrl().toString();
        Long value = getDay(pageview.getTimestamp());
        return new Tuple2<>(key, value);
      }
    });

      

Pageview is of type: Pageview.java

and I will register this class in Spark like so:

Class[] c = new Class[1];
c[0] = Pageview.class;
sparkConf.registerKryoClasses(c);

      

Exception on thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 166) at org.apache.spark.util.ClosureCleaner $ .Clean (ClosureCleaner.scala: 158) at org.apache.spark.SparkContext.clean (SparkContext.scala: 1623) at org.apache.spark.rdd.RDD.map (RDD.scala: 286) at org.apache.spark. api.java.JavaRDDLike $ class.map (JavaRDDLike.scala: 89) at org.apache.spark.api.java.AbstractJavaRDDLike.map (JavaRDDLike.scala: 46) at org.apache.gora.tutorial.log.ExampleSpark. run (ExampleSpark.java:100) in org.apache.gora.tutorial.log.ExampleSpark.main (ExampleSpark.java:53) Reason: java.io.NotSerializableException: org.apache.gora.tutorial.log.ExampleSpark Stack serialization : - the object is not serializable (class: org.apache.gora.tutorial.log.ExampleSpark,value:org.apache.gora.tutorial.log.ExampleSpark@1a2b4497 ) - field (class: org.apache.gora.tutorial.log.ExampleSpark $ 1, name: this $ 0, type: class org.apache.gora.tutorial.log. ExampleSpark) - object (class org.apache.gora.tutorial.log.ExampleSpark $ 1, org.apache.gora.tutorial.log.ExampleSpark$1@4ab2775d) - field (class: org.apache.spark.api.java.JavaPairRDD $$ anonfun $ toScalaFunction $ 1, name: fun $ 1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.api.java.JavaPairRDD $$ anonfun $ toScalaFunction $ 1,) at org.apache.spark.serializer.SerializationDebugger $ .improveException (SerializationDebugger.scala: 38) at org.apache.spark.serializer.JavaSerream. writeObject (JavaSerializer.scala: 47) at org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSerializer.scala: 80) at org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 164) ... 7 more

When I debug the code I can see that it JavaSerializer.scala

gets called even where there is a named class KryoSerializer

.

PS 1: I don't want to use Java Serializer, but the implementation of Serializer

at Pageview

doesn't solve the problem.

PS 2: This doesn't fix the problem:

...
//String key = pageview.getUrl().toString();
//Long value = getDay(pageview.getTimestamp());
String key = "Dummy";
Long value = 1L;
return new Tuple2<>(key, value);
...

      

+3


source to share


1 answer


I came across this issue several times with Java code. Although I used Java serialization, I would make a class that contains this Serializable code, or if you don't want to, I would make the function a static member of the class.

Here is a code snippet of the solution.



public class Test {
   private static Function s = new Function<Pageview, Tuple2<String, Long>>() {

     @Override
     public Tuple2<String, Long> call(Pageview pageview) throws Exception {
       String key = pageview.getUrl().toString();
       Long value = getDay(pageview.getTimestamp());
       return new Tuple2<>(key, value);
      }
  };
}

      

+4


source







All Articles