NoSuchMethodError in shapeless visibility only in Spark

I am trying to write a Spark connector to pull AVRO messages from the RabbitMQ message queue. When decoding AVRO messages, a NoSuchMethodError is thrown, which only occurs when run in Spark.

I couldn't reproduce the spark code exactly outside the spark, but I believe the two examples are quite similar. I think this is the smallest code that reproduces the same scenario.

I removed all connection parameters as because the information is private and the connection doesn't seem to be a problem.

Spark code:

package simpleexample

import org.apache.spark.SparkConf
import org.apache.spark.streaming.rabbitmq.distributed.RabbitMQDistributedKey
import org.apache.spark.streaming.rabbitmq.models.ExchangeAndRouting
import org.apache.spark.streaming.rabbitmq.RabbitMQUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import com.sksamuel.avro4s._

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import com.rabbitmq.client.QueueingConsumer.Delivery
import java.util.HashMap


case class AttributeTuple(attrName: String, attrValue: String)

// AVRO Schema for Events

case class DeviceEvent(
    tenantName: String, 
    groupName: String, 
    subgroupName: String, 
    eventType: String, 
    eventSource: String,
    deviceTypeName: String,
    deviceId: Int,
    timestamp: Long,
    attribute: AttributeTuple
)

object RabbitMonitor {
  def main(args: Array[String]) {
    println("start")

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RabbitMonitor")
    val ssc = new StreamingContext(sparkConf, Seconds(60))

    def parseArrayEvent(delivery: Delivery): Seq[DeviceEvent] = {
        val in = new ByteArrayInputStream(delivery.getBody())
        val input = AvroInputStream.binary[DeviceEvent](in)
        input.iterator.toSeq
    }

    val params: Map[String, String] = Map(
        /* many rabbit connection parameters */
        "maxReceiveTime" -> "60000" // 60s
      )

    val distributedKey = Seq(
        RabbitMQDistributedKey(
          /* queue name */, 
          new ExchangeAndRouting(/* exchange name */, /* routing key */),
          params
        )
    )

    var events = RabbitMQUtils.createDistributedStream[Seq[DeviceEvent]](ssc, distributedKey, params, parseArrayEvent)
    events.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

      

No spark code:

package simpleexample

import com.thenewmotion.akka.rabbitmq._
import akka.actor._
// avoid name collision with rabbitmq channel
import scala.concurrent.{Channel => BasicChannel}
import scala.concurrent.ExecutionContext.Implicits.global

import com.sksamuel.avro4s._
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

object Test extends App {
    implicit val system = ActorSystem()

    val factory = new ConnectionFactory()
    /* Set connection parameters*/
    val exchange: String = /* exchange name */

    val connection: ActorRef = system.actorOf(ConnectionActor.props(factory), "rabbitmq")

    def setupSubscriber(channel: Channel, self: ActorRef) {
        val queue = channel.queueDeclare().getQueue
        channel.queueBind(queue, exchange, /* routing key */)
        val consumer = new DefaultConsumer(channel) {
          override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) {
            val in = new ByteArrayInputStream(body)
            val input = AvroInputStream.binary[DeviceEvent](in)
            val result = input.iterator.toSeq
            println(result)
          }
        }

        channel.basicConsume(queue, true, consumer)
      }

    connection ! CreateChannel(ChannelActor.props(setupSubscriber), Some("eventSubscriber"))


    scala.concurrent.Future {
        def loop(n: Long) {
            Thread.sleep(1000)
            if (n < 30) {
                loop(n + 1)
            }
        }
        loop(0)
    }
}

      

Output without spark (last line is a successfully decoded update):

drex@drexThinkPad:~/src/scala/so-repro/connector/target/scala-2.11$ scala project.jar 
[INFO] [03/02/2017 14:11:06.899] [default-akka.actor.default-dispatcher-4] [akka://default/deadLetters] Message [com.thenewmotion.akka.rabbitmq.ChannelCreated] from Actor[akka://default/user/rabbitmq#-889215077] to Actor[akka://default/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [03/02/2017 14:11:07.337] [default-akka.actor.default-dispatcher-3] [akka://default/user/rabbitmq] akka://default/user/rabbitmq connected to amqp://<rabbit info>
[INFO] [03/02/2017 14:11:07.509] [default-akka.actor.default-dispatcher-4] [akka://default/user/rabbitmq/eventSubscriber] akka://default/user/rabbitmq/eventSubscriber connected
Stream(DeviceEvent(int,na,d01,deviceAttrUpdate,device,TestDeviceType,33554434,1488492704421,AttributeTuple(temperature,60)), ?)

      

Sparks:

drex@drexThinkPad:~/src/scala/so-repro/connector/target/scala-2.11$ spark-submit ./project.jar --class RabbitMonitor
start
Using Spark default log4j profile: org/apache/spark/log4j-defaults.properties
17/03/02 14:20:15 INFO SparkContext: Running Spark version 2.1.0
17/03/02 14:20:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/02 14:20:16 WARN Utils: Your hostname, drexThinkPad resolves to a loopback address: 127.0.1.1; using 192.168.1.11 instead (on interface wlp3s0)
17/03/02 14:20:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/03/02 14:20:16 INFO SecurityManager: Changing view acls to: drex
17/03/02 14:20:16 INFO SecurityManager: Changing modify acls to: drex
17/03/02 14:20:16 INFO SecurityManager: Changing view acls groups to: 
17/03/02 14:20:16 INFO SecurityManager: Changing modify acls groups to: 
17/03/02 14:20:16 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(drex); groups with view permissions: Set(); users  with modify permissions: Set(drex); groups with modify permissions: Set()
17/03/02 14:20:16 INFO Utils: Successfully started service 'sparkDriver' on port 34701.
17/03/02 14:20:16 INFO SparkEnv: Registering MapOutputTracker
17/03/02 14:20:16 INFO SparkEnv: Registering BlockManagerMaster
17/03/02 14:20:16 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
17/03/02 14:20:16 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
17/03/02 14:20:16 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-5cbb13bf-78fe-4227-81b3-1afea40f899a
17/03/02 14:20:16 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
17/03/02 14:20:16 INFO SparkEnv: Registering OutputCommitCoordinator
17/03/02 14:20:16 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/03/02 14:20:16 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.11:4040
17/03/02 14:20:16 INFO SparkContext: Added JAR file:/home/drex/src/scala/so-repro/connector/target/scala-2.11/./project.jar at spark://192.168.1.11:34701/jars/project.jar with timestamp 1488493216614
17/03/02 14:20:16 INFO Executor: Starting executor ID driver on host localhost
17/03/02 14:20:16 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 33276.
17/03/02 14:20:16 INFO NettyBlockTransferService: Server created on 192.168.1.11:33276
17/03/02 14:20:16 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/03/02 14:20:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.11, 33276, None)
17/03/02 14:20:16 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.11:33276 with 366.3 MB RAM, BlockManagerId(driver, 192.168.1.11, 33276, None)
17/03/02 14:20:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.11, 33276, None)
17/03/02 14:20:16 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.11, 33276, None)
17/03/02 14:20:17 INFO RabbitMQDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.rabbitmq.distributed.RabbitMQDStream@546621c4
17/03/02 14:20:17 INFO RabbitMQDStream: Slide time = 60000 ms
17/03/02 14:20:17 INFO RabbitMQDStream: Storage level = Memory Deserialized 1x Replicated
17/03/02 14:20:17 INFO RabbitMQDStream: Checkpoint interval = null
17/03/02 14:20:17 INFO RabbitMQDStream: Remember interval = 60000 ms
17/03/02 14:20:17 INFO RabbitMQDStream: Initialized and validated org.apache.spark.streaming.rabbitmq.distributed.RabbitMQDStream@546621c4
17/03/02 14:20:17 INFO ForEachDStream: Slide time = 60000 ms
17/03/02 14:20:17 INFO ForEachDStream: Storage level = Serialized 1x Replicated
17/03/02 14:20:17 INFO ForEachDStream: Checkpoint interval = null
17/03/02 14:20:17 INFO ForEachDStream: Remember interval = 60000 ms
17/03/02 14:20:17 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@49c6ddef
17/03/02 14:20:17 INFO RecurringTimer: Started timer for JobGenerator at time 1488493260000
17/03/02 14:20:17 INFO JobGenerator: Started JobGenerator at 1488493260000 ms
17/03/02 14:20:17 INFO JobScheduler: Started JobScheduler
17/03/02 14:20:17 INFO StreamingContext: StreamingContext started
17/03/02 14:21:00 INFO JobScheduler: Added jobs for time 1488493260000 ms
17/03/02 14:21:00 INFO JobScheduler: Starting job streaming job 1488493260000 ms.0 from job set of time 1488493260000 ms
17/03/02 14:21:00 INFO SparkContext: Starting job: print at RabbitMonitor.scala:94
17/03/02 14:21:00 INFO DAGScheduler: Got job 0 (print at RabbitMonitor.scala:94) with 1 output partitions
17/03/02 14:21:00 INFO DAGScheduler: Final stage: ResultStage 0 (print at RabbitMonitor.scala:94)
17/03/02 14:21:00 INFO DAGScheduler: Parents of final stage: List()
17/03/02 14:21:00 INFO DAGScheduler: Missing parents: List()
17/03/02 14:21:00 INFO DAGScheduler: Submitting ResultStage 0 (RabbitMQRDD[0] at createDistributedStream at RabbitMonitor.scala:93), which has no missing parents
17/03/02 14:21:00 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.7 KB, free 366.3 MB)
17/03/02 14:21:00 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1752.0 B, free 366.3 MB)
17/03/02 14:21:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.11:33276 (size: 1752.0 B, free: 366.3 MB)
17/03/02 14:21:00 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:996
17/03/02 14:21:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (RabbitMQRDD[0] at createDistributedStream at RabbitMonitor.scala:93)
17/03/02 14:21:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/03/02 14:21:00 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 7744 bytes)
17/03/02 14:21:00 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/03/02 14:21:00 INFO Executor: Fetching spark://192.168.1.11:34701/jars/project.jar with timestamp 1488493216614
17/03/02 14:21:00 INFO TransportClientFactory: Successfully created connection to /192.168.1.11:34701 after 23 ms (0 ms spent in bootstraps)
17/03/02 14:21:00 INFO Utils: Fetching spark://192.168.1.11:34701/jars/project.jar to /tmp/spark-92b6ff6a-b120-4fd0-ba46-a450eff80636/userFiles-c0a334f3-68fc-495f-8ccd-cfe90e6d0bf8/fetchFileTemp2710654534934784726.tmp
17/03/02 14:21:00 INFO Executor: Adding file:/tmp/spark-92b6ff6a-b120-4fd0-ba46-a450eff80636/userFiles-c0a334f3-68fc-495f-8ccd-cfe90e6d0bf8/project.jar to class loader
<removing rabbit queue connection parameters>
17/03/02 14:21:02 INFO RabbitMQRDD: Receiving data in Partition 0 from  
</removing rabbit queue connection parameters>
17/03/02 14:21:50 WARN BlockManager: Putting block rdd_0_0 failed due to an exception
17/03/02 14:21:50 WARN BlockManager: Block rdd_0_0 could not be removed as it was not found on disk or in memory
17/03/02 14:21:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodError: shapeless.Lazy.map(Lscala/Function1;)Lshapeless/Lazy;
    at com.sksamuel.avro4s.SchemaFor$.recordBuilder(SchemaFor.scala:447)
    at simpleexample.RabbitMonitor$$anon$3.<init>(RabbitMonitor.scala:70)
    at simpleexample.RabbitMonitor$.simpleexample$RabbitMonitor$$parseArrayEvent$1(RabbitMonitor.scala:70)
    at simpleexample.RabbitMonitor$$anonfun$15.apply(RabbitMonitor.scala:93)
    at simpleexample.RabbitMonitor$$anonfun$15.apply(RabbitMonitor.scala:93)
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator$$anonfun$5.apply(RabbitMQRDD.scala:209)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.processDelivery(RabbitMQRDD.scala:209)
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.getNext(RabbitMQRDD.scala:194)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/03/02 14:21:50 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NoSuchMethodError: shapeless.Lazy.map(Lscala/Function1;)Lshapeless/Lazy;
    at com.sksamuel.avro4s.SchemaFor$.recordBuilder(SchemaFor.scala:447)
    at simpleexample.RabbitMonitor$$anon$3.<init>(RabbitMonitor.scala:70)
    at simpleexample.RabbitMonitor$.simpleexample$RabbitMonitor$$parseArrayEvent$1(RabbitMonitor.scala:70)
    at simpleexample.RabbitMonitor$$anonfun$15.apply(RabbitMonitor.scala:93)
    at simpleexample.RabbitMonitor$$anonfun$15.apply(RabbitMonitor.scala:93)
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator$$anonfun$5.apply(RabbitMQRDD.scala:209)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.processDelivery(RabbitMQRDD.scala:209)
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.getNext(RabbitMQRDD.scala:194)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/03/02 14:21:50 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
17/03/02 14:21:50 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
17/03/02 14:21:50 INFO TaskSchedulerImpl: Cancelling stage 0

      

build.sbt:

retrieveManaged := true

lazy val sparkVersion = "2.1.0"

scalaVersion in ThisBuild := "2.11.8"

lazy val rabbit = (project in file("rabbit-plugin")).settings(
    name := "Spark Streaming RabbitMQ Receiver",
    homepage := Some(url("https://github.com/Stratio/RabbitMQ-Receiver")),
    description := "RabbitMQ-Receiver is a library that allows the user to read data with Apache Spark from RabbitMQ.",
    exportJars := true,

    assemblyJarName in assembly := "rabbit.jar",
    test in assembly := {},

    moduleName := "spark-rabbitmq",
    organization := "com.stratio.receive",
    version := "0.6.0",

    libraryDependencies ++= Seq(
        "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
        "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", 
        "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", 
        "com.typesafe.akka" %% "akka-actor" % "2.4.11",
        "com.rabbitmq" % "amqp-client" % "3.6.6",
        "joda-time" % "joda-time" % "2.8.2",
        "com.github.sstone" %% "amqp-client" % "1.5" % Test,
        "org.scalatest" %% "scalatest" % "2.2.2" % Test,
        "org.scalacheck" %% "scalacheck" % "1.11.3" % Test,
        "junit" % "junit" % "4.12" % Test, 
        "com.typesafe.akka" %% "akka-testkit" % "2.4.11" % Test
        ) 
)

lazy val root = (project in file("connector")).settings(
    name := "Connector from Rabbit to Kafka queue",
    description := "",
    exportJars := true,

    test in assembly := {},
    assemblyJarName in assembly := "project.jar",

    libraryDependencies ++= Seq(
        "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
        "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
        "com.thenewmotion" %% "akka-rabbitmq" % "3.0.0",
        "org.apache.kafka" % "kafka_2.10" % "0.10.1.1",
        "com.sksamuel.avro4s" %% "avro4s-core" % "1.6.4"
        ) 
) dependsOn rabbit

      

I also use the assembly to assemble the "fat jar" for the spark ( addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")

) and use the sbt assembly

jar command used in both examples above. I am running Spark 2.1.0.

I'm relatively new to the Spark / Scala ecosystem, so hopefully this is a problem with my build settings. It makes no sense that the formless would be unavailable in Spark.

+1


source to share


1 answer


zero323 has the correct answer as far as I can tell. Spark 2.1.0 has a dependency that itself depends on Shapeless 2.0.0.



This problem can be solved in one of two ways: import a dependency that uses Shapeless and shapeless shade , or use another avro library. I went with the last solution.

0


source







All Articles