NoSuchMethodError in shapeless visibility only in Spark
I am trying to write a Spark connector to pull AVRO messages from the RabbitMQ message queue. When decoding AVRO messages, a NoSuchMethodError is thrown, which only occurs when run in Spark.
I couldn't reproduce the spark code exactly outside the spark, but I believe the two examples are quite similar. I think this is the smallest code that reproduces the same scenario.
I removed all connection parameters as because the information is private and the connection doesn't seem to be a problem.
Spark code:
package simpleexample
import org.apache.spark.SparkConf
import org.apache.spark.streaming.rabbitmq.distributed.RabbitMQDistributedKey
import org.apache.spark.streaming.rabbitmq.models.ExchangeAndRouting
import org.apache.spark.streaming.rabbitmq.RabbitMQUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import com.sksamuel.avro4s._
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import com.rabbitmq.client.QueueingConsumer.Delivery
import java.util.HashMap
case class AttributeTuple(attrName: String, attrValue: String)
// AVRO Schema for Events
case class DeviceEvent(
tenantName: String,
groupName: String,
subgroupName: String,
eventType: String,
eventSource: String,
deviceTypeName: String,
deviceId: Int,
timestamp: Long,
attribute: AttributeTuple
)
object RabbitMonitor {
def main(args: Array[String]) {
println("start")
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RabbitMonitor")
val ssc = new StreamingContext(sparkConf, Seconds(60))
def parseArrayEvent(delivery: Delivery): Seq[DeviceEvent] = {
val in = new ByteArrayInputStream(delivery.getBody())
val input = AvroInputStream.binary[DeviceEvent](in)
input.iterator.toSeq
}
val params: Map[String, String] = Map(
/* many rabbit connection parameters */
"maxReceiveTime" -> "60000" // 60s
)
val distributedKey = Seq(
RabbitMQDistributedKey(
/* queue name */,
new ExchangeAndRouting(/* exchange name */, /* routing key */),
params
)
)
var events = RabbitMQUtils.createDistributedStream[Seq[DeviceEvent]](ssc, distributedKey, params, parseArrayEvent)
events.print()
ssc.start()
ssc.awaitTermination()
}
}
No spark code:
package simpleexample
import com.thenewmotion.akka.rabbitmq._
import akka.actor._
// avoid name collision with rabbitmq channel
import scala.concurrent.{Channel => BasicChannel}
import scala.concurrent.ExecutionContext.Implicits.global
import com.sksamuel.avro4s._
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
object Test extends App {
implicit val system = ActorSystem()
val factory = new ConnectionFactory()
/* Set connection parameters*/
val exchange: String = /* exchange name */
val connection: ActorRef = system.actorOf(ConnectionActor.props(factory), "rabbitmq")
def setupSubscriber(channel: Channel, self: ActorRef) {
val queue = channel.queueDeclare().getQueue
channel.queueBind(queue, exchange, /* routing key */)
val consumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) {
val in = new ByteArrayInputStream(body)
val input = AvroInputStream.binary[DeviceEvent](in)
val result = input.iterator.toSeq
println(result)
}
}
channel.basicConsume(queue, true, consumer)
}
connection ! CreateChannel(ChannelActor.props(setupSubscriber), Some("eventSubscriber"))
scala.concurrent.Future {
def loop(n: Long) {
Thread.sleep(1000)
if (n < 30) {
loop(n + 1)
}
}
loop(0)
}
}
Output without spark (last line is a successfully decoded update):
drex@drexThinkPad:~/src/scala/so-repro/connector/target/scala-2.11$ scala project.jar
[INFO] [03/02/2017 14:11:06.899] [default-akka.actor.default-dispatcher-4] [akka://default/deadLetters] Message [com.thenewmotion.akka.rabbitmq.ChannelCreated] from Actor[akka://default/user/rabbitmq#-889215077] to Actor[akka://default/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [03/02/2017 14:11:07.337] [default-akka.actor.default-dispatcher-3] [akka://default/user/rabbitmq] akka://default/user/rabbitmq connected to amqp://<rabbit info>
[INFO] [03/02/2017 14:11:07.509] [default-akka.actor.default-dispatcher-4] [akka://default/user/rabbitmq/eventSubscriber] akka://default/user/rabbitmq/eventSubscriber connected
Stream(DeviceEvent(int,na,d01,deviceAttrUpdate,device,TestDeviceType,33554434,1488492704421,AttributeTuple(temperature,60)), ?)
Sparks:
drex@drexThinkPad:~/src/scala/so-repro/connector/target/scala-2.11$ spark-submit ./project.jar --class RabbitMonitor
start
Using Spark default log4j profile: org/apache/spark/log4j-defaults.properties
17/03/02 14:20:15 INFO SparkContext: Running Spark version 2.1.0
17/03/02 14:20:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/02 14:20:16 WARN Utils: Your hostname, drexThinkPad resolves to a loopback address: 127.0.1.1; using 192.168.1.11 instead (on interface wlp3s0)
17/03/02 14:20:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/03/02 14:20:16 INFO SecurityManager: Changing view acls to: drex
17/03/02 14:20:16 INFO SecurityManager: Changing modify acls to: drex
17/03/02 14:20:16 INFO SecurityManager: Changing view acls groups to:
17/03/02 14:20:16 INFO SecurityManager: Changing modify acls groups to:
17/03/02 14:20:16 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(drex); groups with view permissions: Set(); users with modify permissions: Set(drex); groups with modify permissions: Set()
17/03/02 14:20:16 INFO Utils: Successfully started service 'sparkDriver' on port 34701.
17/03/02 14:20:16 INFO SparkEnv: Registering MapOutputTracker
17/03/02 14:20:16 INFO SparkEnv: Registering BlockManagerMaster
17/03/02 14:20:16 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
17/03/02 14:20:16 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
17/03/02 14:20:16 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-5cbb13bf-78fe-4227-81b3-1afea40f899a
17/03/02 14:20:16 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
17/03/02 14:20:16 INFO SparkEnv: Registering OutputCommitCoordinator
17/03/02 14:20:16 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/03/02 14:20:16 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.11:4040
17/03/02 14:20:16 INFO SparkContext: Added JAR file:/home/drex/src/scala/so-repro/connector/target/scala-2.11/./project.jar at spark://192.168.1.11:34701/jars/project.jar with timestamp 1488493216614
17/03/02 14:20:16 INFO Executor: Starting executor ID driver on host localhost
17/03/02 14:20:16 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 33276.
17/03/02 14:20:16 INFO NettyBlockTransferService: Server created on 192.168.1.11:33276
17/03/02 14:20:16 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/03/02 14:20:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.11, 33276, None)
17/03/02 14:20:16 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.11:33276 with 366.3 MB RAM, BlockManagerId(driver, 192.168.1.11, 33276, None)
17/03/02 14:20:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.11, 33276, None)
17/03/02 14:20:16 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.11, 33276, None)
17/03/02 14:20:17 INFO RabbitMQDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.rabbitmq.distributed.RabbitMQDStream@546621c4
17/03/02 14:20:17 INFO RabbitMQDStream: Slide time = 60000 ms
17/03/02 14:20:17 INFO RabbitMQDStream: Storage level = Memory Deserialized 1x Replicated
17/03/02 14:20:17 INFO RabbitMQDStream: Checkpoint interval = null
17/03/02 14:20:17 INFO RabbitMQDStream: Remember interval = 60000 ms
17/03/02 14:20:17 INFO RabbitMQDStream: Initialized and validated org.apache.spark.streaming.rabbitmq.distributed.RabbitMQDStream@546621c4
17/03/02 14:20:17 INFO ForEachDStream: Slide time = 60000 ms
17/03/02 14:20:17 INFO ForEachDStream: Storage level = Serialized 1x Replicated
17/03/02 14:20:17 INFO ForEachDStream: Checkpoint interval = null
17/03/02 14:20:17 INFO ForEachDStream: Remember interval = 60000 ms
17/03/02 14:20:17 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@49c6ddef
17/03/02 14:20:17 INFO RecurringTimer: Started timer for JobGenerator at time 1488493260000
17/03/02 14:20:17 INFO JobGenerator: Started JobGenerator at 1488493260000 ms
17/03/02 14:20:17 INFO JobScheduler: Started JobScheduler
17/03/02 14:20:17 INFO StreamingContext: StreamingContext started
17/03/02 14:21:00 INFO JobScheduler: Added jobs for time 1488493260000 ms
17/03/02 14:21:00 INFO JobScheduler: Starting job streaming job 1488493260000 ms.0 from job set of time 1488493260000 ms
17/03/02 14:21:00 INFO SparkContext: Starting job: print at RabbitMonitor.scala:94
17/03/02 14:21:00 INFO DAGScheduler: Got job 0 (print at RabbitMonitor.scala:94) with 1 output partitions
17/03/02 14:21:00 INFO DAGScheduler: Final stage: ResultStage 0 (print at RabbitMonitor.scala:94)
17/03/02 14:21:00 INFO DAGScheduler: Parents of final stage: List()
17/03/02 14:21:00 INFO DAGScheduler: Missing parents: List()
17/03/02 14:21:00 INFO DAGScheduler: Submitting ResultStage 0 (RabbitMQRDD[0] at createDistributedStream at RabbitMonitor.scala:93), which has no missing parents
17/03/02 14:21:00 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.7 KB, free 366.3 MB)
17/03/02 14:21:00 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1752.0 B, free 366.3 MB)
17/03/02 14:21:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.11:33276 (size: 1752.0 B, free: 366.3 MB)
17/03/02 14:21:00 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:996
17/03/02 14:21:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (RabbitMQRDD[0] at createDistributedStream at RabbitMonitor.scala:93)
17/03/02 14:21:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/03/02 14:21:00 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 7744 bytes)
17/03/02 14:21:00 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/03/02 14:21:00 INFO Executor: Fetching spark://192.168.1.11:34701/jars/project.jar with timestamp 1488493216614
17/03/02 14:21:00 INFO TransportClientFactory: Successfully created connection to /192.168.1.11:34701 after 23 ms (0 ms spent in bootstraps)
17/03/02 14:21:00 INFO Utils: Fetching spark://192.168.1.11:34701/jars/project.jar to /tmp/spark-92b6ff6a-b120-4fd0-ba46-a450eff80636/userFiles-c0a334f3-68fc-495f-8ccd-cfe90e6d0bf8/fetchFileTemp2710654534934784726.tmp
17/03/02 14:21:00 INFO Executor: Adding file:/tmp/spark-92b6ff6a-b120-4fd0-ba46-a450eff80636/userFiles-c0a334f3-68fc-495f-8ccd-cfe90e6d0bf8/project.jar to class loader
<removing rabbit queue connection parameters>
17/03/02 14:21:02 INFO RabbitMQRDD: Receiving data in Partition 0 from
</removing rabbit queue connection parameters>
17/03/02 14:21:50 WARN BlockManager: Putting block rdd_0_0 failed due to an exception
17/03/02 14:21:50 WARN BlockManager: Block rdd_0_0 could not be removed as it was not found on disk or in memory
17/03/02 14:21:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodError: shapeless.Lazy.map(Lscala/Function1;)Lshapeless/Lazy;
at com.sksamuel.avro4s.SchemaFor$.recordBuilder(SchemaFor.scala:447)
at simpleexample.RabbitMonitor$$anon$3.<init>(RabbitMonitor.scala:70)
at simpleexample.RabbitMonitor$.simpleexample$RabbitMonitor$$parseArrayEvent$1(RabbitMonitor.scala:70)
at simpleexample.RabbitMonitor$$anonfun$15.apply(RabbitMonitor.scala:93)
at simpleexample.RabbitMonitor$$anonfun$15.apply(RabbitMonitor.scala:93)
at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator$$anonfun$5.apply(RabbitMQRDD.scala:209)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.processDelivery(RabbitMQRDD.scala:209)
at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.getNext(RabbitMQRDD.scala:194)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/03/02 14:21:50 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NoSuchMethodError: shapeless.Lazy.map(Lscala/Function1;)Lshapeless/Lazy;
at com.sksamuel.avro4s.SchemaFor$.recordBuilder(SchemaFor.scala:447)
at simpleexample.RabbitMonitor$$anon$3.<init>(RabbitMonitor.scala:70)
at simpleexample.RabbitMonitor$.simpleexample$RabbitMonitor$$parseArrayEvent$1(RabbitMonitor.scala:70)
at simpleexample.RabbitMonitor$$anonfun$15.apply(RabbitMonitor.scala:93)
at simpleexample.RabbitMonitor$$anonfun$15.apply(RabbitMonitor.scala:93)
at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator$$anonfun$5.apply(RabbitMQRDD.scala:209)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.processDelivery(RabbitMQRDD.scala:209)
at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.getNext(RabbitMQRDD.scala:194)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/03/02 14:21:50 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
17/03/02 14:21:50 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/03/02 14:21:50 INFO TaskSchedulerImpl: Cancelling stage 0
build.sbt:
retrieveManaged := true
lazy val sparkVersion = "2.1.0"
scalaVersion in ThisBuild := "2.11.8"
lazy val rabbit = (project in file("rabbit-plugin")).settings(
name := "Spark Streaming RabbitMQ Receiver",
homepage := Some(url("https://github.com/Stratio/RabbitMQ-Receiver")),
description := "RabbitMQ-Receiver is a library that allows the user to read data with Apache Spark from RabbitMQ.",
exportJars := true,
assemblyJarName in assembly := "rabbit.jar",
test in assembly := {},
moduleName := "spark-rabbitmq",
organization := "com.stratio.receive",
version := "0.6.0",
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"com.typesafe.akka" %% "akka-actor" % "2.4.11",
"com.rabbitmq" % "amqp-client" % "3.6.6",
"joda-time" % "joda-time" % "2.8.2",
"com.github.sstone" %% "amqp-client" % "1.5" % Test,
"org.scalatest" %% "scalatest" % "2.2.2" % Test,
"org.scalacheck" %% "scalacheck" % "1.11.3" % Test,
"junit" % "junit" % "4.12" % Test,
"com.typesafe.akka" %% "akka-testkit" % "2.4.11" % Test
)
)
lazy val root = (project in file("connector")).settings(
name := "Connector from Rabbit to Kafka queue",
description := "",
exportJars := true,
test in assembly := {},
assemblyJarName in assembly := "project.jar",
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
"com.thenewmotion" %% "akka-rabbitmq" % "3.0.0",
"org.apache.kafka" % "kafka_2.10" % "0.10.1.1",
"com.sksamuel.avro4s" %% "avro4s-core" % "1.6.4"
)
) dependsOn rabbit
I also use the assembly to assemble the "fat jar" for the spark ( addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")
) and use the sbt assembly
jar command used in both examples above. I am running Spark 2.1.0.
I'm relatively new to the Spark / Scala ecosystem, so hopefully this is a problem with my build settings. It makes no sense that the formless would be unavailable in Spark.
source to share
zero323 has the correct answer as far as I can tell. Spark 2.1.0 has a dependency that itself depends on Shapeless 2.0.0.
This problem can be solved in one of two ways: import a dependency that uses Shapeless and shapeless shade , or use another avro library. I went with the last solution.
source to share