Setting the number of Spark tasks when scanning a Cassandra table

I have a simple Spark task that reads 500mm lines from a 5 node Cassandra cluster that always runs 6 tasks, causing write problems due to the size of each task. I tried adjusting input_split_size, which doesn't seem to have any effect. At the moment I am forced to redistribute the table scan, which is not ideal as it is expensive.

After reading a few posts, I tried to increase the number of executors in my run script (below), although that had no effect.

If there is no way to set the number of tasks in a Cassandra table scan, then I should be fine, but I have a persistent feeling that I am missing something.

Spark workers live on C * Nodes, which are 8-core, 64GB servers with 2TB SSDs each.

...
val conf = new SparkConf(true).set("spark.cassandra.connection.host",
cassandraHost).setAppName("rowMigration")
  conf.set("spark.shuffle.memoryFraction", "0.4")
  conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  conf.set("spark.executor.memory", "15G")
  conf.set("spark.cassandra.input.split.size_in_mb", "32") //default 64mb
  conf.set("spark.cassandra.output.batch.size.bytes", "1000") //default
  conf.set("spark.cassandra.output.concurrent.writes", "5") //default

val sc = new SparkContext(conf)

val rawEvents = sc.cassandraTable(cassandraKeyspace, eventTable)
  .select("accountid", "userid", "eventname", "eventid", "eventproperties")
  .filter(row=>row.getString("accountid").equals("someAccount"))
  .repartition(100)

val object = rawEvents
  .map(ele => (ele.getString("userid"),
    UUID.randomUUID(),
    UUID.randomUUID(),
    ele.getUUID("eventid"),
    ele.getString("eventname"),
    "event type",
    UUIDs.unixTimestamp(ele.getUUID("eventid")),
    ele.getMap[String, String]("eventproperties"),
    Map[String, String](),
    Map[String, String](),
    Map[String, String]()))
  .map(row=>MyObject(row))

Object.saveToCassandra(targetCassandraKeyspace,eventTable)

      

running script:

#!/bin/bash
export SHADED_JAR="Migrate.jar"
export SPARKHOME="${SPARKHOME:-/opt/spark}"
export SPARK_CLASSPATH="$SHADED_JAR:$SPARK_CLASSPATH"
export CLASS=com.migration.migrate
"${SPARKHOME}/bin/spark-submit" \
        --class "${CLASS}" \
        --jars $SHADED_JAR,$SHADED_JAR \
        --master spark://cas-1-5:7077  \
        --num-executors 15 \
        --executor-memory 20g \
        --executor-cores 4 "$SHADED_JAR" \
        --worker-cores 20 \
        -Dcassandra.connection.host=10.1.20.201 \
        -Dzookeeper.host=10.1.20.211:2181 \

      

EDIT - Following Peter's answer:

I set ReadConf.splitCount on sc.cassandraTable as follows, however this does not change the number of generated tasks, which means I still need to redistribute the table scan. I’m starting to think that I’m thinking about it the wrong way and that redistribution is a necessity. It currently takes about 1.5 hours to complete, and redeploying the table scan to 1000 tasks, which is roughly 10MB, reduced the write time to minutes.

val cassReadConfig = new ReadConf {
      ReadConf.apply(splitCount = Option(1000)
        )
    }

    val sc = new SparkContext(conf)

    val rawEvents = sc.cassandraTable(cassandraKeyspace, eventTable)
    .withReadConf(readConf = cassReadConfig)

      

+2


source to share


2 answers


Since the spark plug is 1.3, the split sizes are estimated based on the system.size_estimates of the Cassandra table available since Cassandra 2.1.5. This table is periodically updated by Cassandra, and soon after loading / deleting new data or joining new nodes, its contents may be incorrect. Check if the estimates match your data sum. This is a relatively new feature, so it is possible that there are some bugs there.

If the estimates are wrong or you are using an older Cassandra, we have left the option to override the automatic split size setting. sc.cassandraTable takes a ReadConf parameter in which you can set splitCount which will force a fixed number of sections.



As for the split_size_in_mb parameter, there was actually a bug in the project source for some time, but it was fixed before any version posted to maven was released. So unless you are compiling the connector from the (old) source, you shouldn't hit it.

+3


source


There seems to be a bug with the split.size_in_mb parameter. The code might interpret it as bytes instead of megabytes, so try changing 32 to something larger. See an example in the answers here .



0


source







All Articles