How to use foreach (add each entry to solr) in foreachRDD to spark?
I am having an issue with spark flow for design patterns to use foreachRDD.
I applied the patten design like this. http://spark.apache.org/docs/latest/streaming-programming-guide.html
- Sample code example
dstream.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
})
})
- My code
dsteam.foreachRDD( rdd => {
rdd.foreachPartition(partitionOfRecords => {
val connection = SolrConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.add(makeSolrInputDocument(record)))
SolrConnectionPool.returnConnection(connection)
})
})
** Error logs received **
> log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
init cloudSolrServer ===== > org.apache.solr.client.solrj.impl.CloudSolrServer@157dbcd4
init cloudSolrServer ===== > org.apache.solr.client.solrj.impl.CloudSolrServer@6fa2fa45
init cloudSolrServer ===== > org.apache.solr.client.solrj.impl.CloudSolrServer@5802ffe7
...................... (skip)
14/10/17 13:22:01 INFO JobScheduler: Finished job streaming job 1413519720000 ms.0 from job set of time 1413519720000 ms
14/10/17 13:22:01 INFO JobScheduler: Starting job streaming job 1413519720000 ms.1 from job set of time 1413519720000 ms
14/10/17 13:22:01 INFO SparkContext: Starting job: foreachPartition at SbclogCep.scala:49
14/10/17 13:22:01 INFO DAGScheduler: Got job 1 (foreachPartition at SbclogCep.scala:49) with 1 output partitions (allowLocal=false)
14/10/17 13:22:01 INFO DAGScheduler: Final stage: Stage 1(foreachPartition at SbclogCep.scala:49)
-------------------------------------------
Time: 1413519730000 ms
-------------------------------------------
14/10/17 13:22:57 INFO SparkContext: Starting job: foreachPartition at SbclogCep.scala:49
14/10/17 13:22:57 INFO TaskSchedulerImpl: Cancelling stage 1
14/10/17 13:22:57 INFO JobScheduler: Starting job streaming job 1413519730000 ms.0 from job set of time 1413519730000 ms
14/10/17 13:22:57 INFO JobScheduler: Finished job streaming job 1413519730000 ms.0 from job set of time 1413519730000 ms
14/10/17 13:22:57 INFO JobScheduler: Starting job streaming job 1413519730000 ms.1 from job set of time 1413519730000 ms
14/10/17 13:22:57 ERROR JobScheduler: Error running job streaming job 1413519720000 ms.1
org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up.
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/10/17 13:22:57 INFO SparkContext: Job finished: foreachPartition at SbclogCep.scala:49, took 2.6276E-5 s
-------------------------------------------
Time: 1413519740000 ms
-------------------------------------------
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up.
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
ForeachRDD not working ... How do I do this? Please help me..
+3
source to share
No one has answered this question yet
Check out similar questions: