Why is spark broadcast not working when I use an app extension?

The first code throws a null pointer exception.

object TryBroadcast extends App{
  val conf = new SparkConf().setAppName("o_o")
  val sc = new SparkContext(conf)
  val sample = sc.parallelize(1 to 1024)
  val bro = sc.broadcast(6666)
  val broSample = sample.map(x => x.toString + bro.value)
  broSample.collect().foreach(println)
}

      

The second works well.

object TryBroadcast {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("o_o")
    val sc = new SparkContext(conf)
    val sample = sc.parallelize(1 to 1024)
    val bro = sc.broadcast(6666)
    val broSample = sample.map(x => x.toString + bro.value)
    broSample.collect().foreach(println)
  }
}

      

It seems that lightcast has something conflicting with scala.App

scala version: 2.10.5 spark version: 1.4.0 StackTrace:

lang.NullPointerException
    at TryBroadcast$$anonfun$1.apply(TryBroadcast.scala:11)
    at TryBroadcast$$anonfun$1.apply(TryBroadcast.scala:11)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

      

+3


source to share


2 answers


bro

in two cases it is completely different. In the first case, this is a field for an instance of the singleton class ( TryBroadcast

). The second is a local variable.

I have a local variable being captured, serialized and sent to executors. In the first case, the link is to the box, so the singleton will be captured and sent. I'm not sure how the Scala single is built and how it will be captured. Apparently in this case it ends up uninitialized when accessing the executor.



You can make a bro

local variable like this:

object TryBroadcast extends App {
  val conf = new SparkConf().setAppName("o_o")
  val sc = new SparkContext(conf)
  val sample = sc.parallelize(1 to 1024)
  val broSample = {
    val bro = sc.broadcast(6666)
    sample.map(x => x.toString + bro.value)
  }
  broSample.collect().foreach(println)
}

      

+1


source


It is not well documented, but it is recommended to use def main(args: Array[String]): Unit = ???

instead extends App

.



See https://issues.apache.org/jira/browse/SPARK-4170 and https://github.com/apache/spark/pull/3497

+1


source







All Articles