Correct use of the Spark Cassandra connector

I want to spark for some ETLs that mostly consist of "update" statements (the column is the set to be added to, so a simple insert probably won't work). Basically, it seems that issuing CQL queries to import data is the best option. Using the Spark Cassandra connector, I can see that I can do this:

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra

Now I do not want to open a session and close it for every line in the source (am I right, I don’t want this? Usually I have one session for the whole process and continue to use it in "normal" mode, Programs). However, it says the connector is serializable, but the session is clearly not. So wrapping all imports inside a single "withSessionDo" seems to cause problems. I thought to use something like this:

class CassandraStorage(conf:SparkConf) {
  val session = CassandraConnector(conf).openSession()
  def store (t:Thingy) : Unit = {
    //session.execute cql goes here
  }
}

      

Is this a good approach? Do I need to worry about closing the session? Where / how best can I do it? Any pointers are appreciated.

+3


source to share


1 answer


You actually want to use withSessionDo

because it won't actually open and close the session on every access. Under the hood withSessionDo

, a JVM level session is opened. This means that you will only have one PER session object of the PER node cluster configuration.

This means that a code like

val connector = CassandraConnector(sc.getConf)
sc.parallelize(1 to 10000000L).map(connector.withSessionDo( Session => stuff)

      

Will only do one cluster and session object per JVM executor no matter how many cores there are on each machine.



For efficiency, I still recommend using mapPartitions to minimize cache checks.

sc.parallelize(1 to 10000000L)
  .mapPartitions(it => connector.withSessionDo( session => 
      it.map( row => do stuff here )))

      

In addition, the session object also uses a prepare statement cache, which allows the prepared statement to be cached in your serialized code, and it will only ever be prepared once per jvm (all other calls will return a cache reference.)

+1


source







All Articles