JCB exclusive version not working

I have used sparkContext.broadcast function in my streaming spark application to share redis connection pool (JedisPool).

The code looks like this:

lazy val redisPool = {
  val pool = Redis.createRedisPool(redisHost, redisPort)
  ssc.sparkContext.broadcast(pool)
}

      

Redis.createRedisPool:

object Redis {

  def createRedisPool(host: String, port: Int, maxIdle: Int, maxTotal: Int, timeout: Int): JedisPool = {
    val pc = new JedisPoolConfig
    pc.setMaxIdle(maxIdle)
    pc.setMaxTotal(maxTotal)
    pc.setMaxWaitMillis(timeout)
    new JedisPool(pc, host, port)
  }

  def createRedisPool(host: String, port: Int): JedisPool = {
    createRedisPool(host, port, 5, 5, 5000)
  }
}

      

It works in local deployment mode, but when I run it in yarn / offline mode like

spark-submit --master "yarn-client" --class ...

      

will receive an error message:

Exception in thread "main" java.io.NotSerializableException: redis.clients.jedis.JedisPool
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1165)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:329)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:210)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:68)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809)
    at org.culiu.bd.streaming.AdSysStreaming$.redisPool$lzycompute$1(AdSysStreaming.scala:84)
    at org.culiu.bd.streaming.AdSysStreaming$.redisPool$1(AdSysStreaming.scala:82)
    at org.culiu.bd.streaming.AdSysStreaming$.main(AdSysStreaming.scala:154)
    at org.culiu.bd.streaming.AdSysStreaming.main(AdSysStreaming.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

      

I tried to set spark.serializer = org.apache.spark.serializer.KryoSerializer in my application and then got an error like:

Exception in thread "main" com.esotericsoftware.kryo.KryoException:        java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.spark.executor.ExecutorURLClassLoader)
factoryClassLoader (org.apache.commons.pool2.impl.GenericObjectPool)
internalPool (redis.clients.jedis.JedisPool)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
    at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119)
    at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:210)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:68)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809)
    at org.culiu.bd.streaming.AdSysStreaming$.redisPool$lzycompute$1(AdSysStreaming.scala:85)
    at org.culiu.bd.streaming.AdSysStreaming$.redisPool$1(AdSysStreaming.scala:83)
    at org.culiu.bd.streaming.AdSysStreaming$.main(AdSysStreaming.scala:155)
    at org.culiu.bd.streaming.AdSysStreaming.main(AdSysStreaming.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.ConcurrentModificationException
    at java.util.AbstractList$Itr.checkForComodification(AbstractList.java:372)
    at java.util.AbstractList$Itr.next(AbstractList.java:343)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    ... 39 more

      

how can i solve this?

+3


source to share


1 answer


It looks like the problem is that the class is redis.clients.jedis.JedisPool

not serializable. This doesn't seem like a Spark specific issue, as I think any attempt to serialize this class will fail.



+3


source







All Articles