DataStax Enterprise: saveToCassandra generates a lot of targeted handovers

I am having problems generating data from spark to cassandra using dse 4.5.3

I have a cluster of 8 nodes (quite powerful nodes) and I want to generate some test data from a spark.

My spark job reads 5M rows from a cassandra table (it represents one day of data) and then caches them in memory (32GB on Node Mem so no problem) and finally store them n- times in another cassandra table so that simulate more days of data.

 val table = sc.cassandraTable[RecordData]( "data", "one_day" ).cache
    val firstDate = table.first.gets_dt_tm
    val start = 1
    val end = 10
    for(i <- start to end){

        table.map(row => {
            //modify row to increment row timestamp day according to i

            java.lang.Thread sleep 2
            row

       }).saveToCassandra("data","ten_days")
    }

      

I slept too to slow down the recording process, but that didn't help. The problem is that in my cluster I get a lot of hints and I have to constantly repair nodes. Keep in mind that I need to create 600 days of data.

This is the structure of my table

CREATE TABLE ten_days(
YEAR int,
MONTH int,
DAY int,
ID decimal,
... other fields
S_DT_TM timestamp,
PRIMARY KEY  ((ID,C_TRX_REF),YEAR,MONTH,DAY,S_DT_TM));

      

ID and C_TRX_REF are unique key in one day, but not in several days. Various counter (ID, C_TRX_REF) is 5M.

S_DT_TM is a second resolution timestamp, so it is not unique in my dataset.

Why does the spark to write cassandra generate hints? Need more information? What are the best techniques for writing millions of lines in cassandra from a spark?

thank

enter image description here

+1


source to share


1 answer


Sleeping in your statement is most likely not actually slowing down that request. Since the operations are applied to each section of the section, I am assuming that the sleep is simply paused before the entire section starts recording.

For a real problem. The only reason you'll be generating hints is because one of your nodes is unable to handle the amount of data written by your spark. This means that the node was unreachable while the mutation was running, so the coordination node kept a serialized copy of the mutation when the unreachable node came back online. You can reduce the batch size to reduce the number of concurrent writes using

spark.cassandra.output.batch.size.rows: number of lines per batch; the default is "auto", which means the connector will adjust the number of lines based on the amount of data in each line

or



spark.cassandra.output.batch.size.bytes: maximum total packet size in bytes; the default is 64 kB.

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md

But you are most likely better off increasing your throughput by making sure all of your hard drives in your cluster are on SSD and that commitlog / spark directories are also on SSD.

+3


source







All Articles