Spark app runs locally, but not on a cluster

Hi I am new to Spark and Scala and I am trying to write code suitable for RandomForestClassifier. When I run it in local mode everything is fine, but I get an error when I try to run it on a 2 worker cluster. I am running the program through IntelliJ Idea and this is an error. Scala version is 2.10.6 and spark version is 2.1.1

I edited the error trace

    17/07/01 18:28:54 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37625.
17/07/01 18:28:54 INFO NettyBlockTransferService: Server created on 192.168.1.5:37625
17/07/01 18:28:54 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/07/01 18:28:54 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.5, 37625, None)
17/07/01 18:28:54 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.5:37625 with 884.7 MB RAM, BlockManagerId(driver, 192.168.1.5, 37625, None)
17/07/01 18:28:54 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.5, 37625, None)
17/07/01 18:28:54 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.5, 37625, None)
17/07/01 18:28:54 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20170701182854-0000/0 on worker-20170701181736-192.168.1.5-46243 (192.168.1.5:46243) with 4 cores
17/07/01 18:28:54 INFO StandaloneSchedulerBackend: Granted executor ID app-20170701182854-0000/0 on hostPort 192.168.1.5:46243 with 4 cores, 1024.0 MB RAM
17/07/01 18:28:55 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20170701182854-0000/0 is now RUNNING
17/07/01 18:28:55 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
17/07/01 18:28:55 INFO SharedState: Warehouse path is 'file:/home/apostolis/IdeaProjects/WordCount/spark-warehouse/'.
17/07/01 18:28:56 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 236.5 KB, free 884.5 MB)
17/07/01 18:28:56 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 22.9 KB, free 884.4 MB)
17/07/01 18:28:56 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.5:37625 (size: 22.9 KB, free: 884.7 MB)
17/07/01 18:28:56 INFO SparkContext: Created broadcast 0 from textFile at CountWords.scala:31
17/07/01 18:28:56 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 236.5 KB, free 884.2 MB)
17/07/01 18:28:56 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 22.9 KB, free 884.2 MB)
17/07/01 18:28:56 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.5:37625 (size: 22.9 KB, free: 884.7 MB)
17/07/01 18:28:56 INFO SparkContext: Created broadcast 1 from textFile at CountWords.scala:37
17/07/01 18:28:56 INFO FileInputFormat: Total input paths to process : 1
17/07/01 18:28:57 INFO SparkContext: Starting job: take at DecisionTreeMetadata.scala:112
17/07/01 18:28:57 INFO DAGScheduler: Got job 0 (take at DecisionTreeMetadata.scala:112) with 1 output partitions
17/07/01 18:28:57 INFO DAGScheduler: Final stage: ResultStage 0 (take at DecisionTreeMetadata.scala:112)
17/07/01 18:28:57 INFO DAGScheduler: Parents of final stage: List()
17/07/01 18:28:57 INFO DAGScheduler: Missing parents: List()
17/07/01 18:28:57 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[12] at map at DecisionTreeMetadata.scala:112), which has no missing parents
17/07/01 18:28:57 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 4.6 KB, free 884.2 MB)
17/07/01 18:28:57 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.6 KB, free 884.2 MB)
17/07/01 18:28:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.5:37625 (size: 2.6 KB, free: 884.7 MB)
17/07/01 18:28:57 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:996
17/07/01 18:28:57 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[12] at map at DecisionTreeMetadata.scala:112)
17/07/01 18:28:57 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/07/01 18:28:57 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(null) (192.168.1.5:54248) with ID 0
17/07/01 18:28:58 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 192.168.1.5, executor 0, partition 0, PROCESS_LOCAL, 5995 bytes)
17/07/01 18:28:58 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.5:45601 with 366.3 MB RAM, BlockManagerId(0, 192.168.1.5, 45601, None)
17/07/01 18:28:58 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.5:45601 (size: 2.6 KB, free: 366.3 MB)
17/07/01 18:28:58 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 192.168.1.5, executor 0): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

      

And here is the code

object NewTest {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder.master("spark://192.168.1.5:7077")
      .appName("RandomForest")
      .getOrCreate()

    case class Record(x1:Int,x2:Int,x3:Int,x4:Int,x5:Int,x6:Int,x7:Int,x8:Int,x9:Int,x10:Int,label:Int)

    val sc=spark.sparkContext

    val raw=sc.textFile("/home/apostolis/Desktop/poker_train.data").map(x=>x.split(',')).map{
      case Array(x1,x2,x3,x4,x5,x6,x7,x8,x9,x10,label)=>Record(x1.toInt,x2.toInt,x3.toInt,x4.toInt,x5.toInt,x6.toInt,x7.toInt,x8.toInt,x9.toInt,x10.toInt,label.toInt)
    }

    //raw.foreach(println)

    val raw1=sc.textFile("/home/apostolis/Desktop/poker_test.data").map(x=>x.split(',')).map{
      case Array(x1,x2,x3,x4,x5,x6,x7,x8,x9,x10,label)=>Record(x1.toInt,x2.toInt,x3.toInt,x4.toInt,x5.toInt,x6.toInt,x7.toInt,x8.toInt,x9.toInt,x10.toInt,label.toInt)
    }

    var train=raw.map(x=> LabeledPoint(x.label,Vectors.dense(x.x1,x.x2,x.x3,x.x4,x.x5,x.x6,x.x7,x.x8,x.x9,x.x10)))

    var test=raw.map(x=> LabeledPoint(x.label,Vectors.dense(x.x1,x.x2,x.x3,x.x4,x.x5,x.x6,x.x7,x.x8,x.x9,x.x10)))

    //train.foreach(println)


    val treeStrategy=Strategy.defaultStrategy("Classification")
    val numTrees=600
    val featureSubsetStrategy= "auto"
    val categorical=Map[Int,Int]()
    val impurity = "gini"
    val maxDepth = 4
    val maxBins = 32
    //val model=RandomForest.trainClassifier(train,treeStrategy,numTrees,featureSubsetStrategy,seed=1234)
    val model=RandomForest.trainClassifier(train,10,categorical,numTrees,featureSubsetStrategy,impurity,maxDepth,maxBins,seed=124)

    val labels=test.map{
      point => val prediction = model.predict(point.features)
        (point.label,prediction)
    }

    //labels.foreach(println)

    val testerror= labels.filter(r=>r._1!=r._2).count.toDouble / test.count()

    System.out.println(testerror)



  }
}

      

+3


source to share





All Articles