UPDATE the Cassandra table using the cassandra spark plug
I am facing issue with spark cassandra connector on scala when updating a table in my key space
Here is my code snippet
val query = "UPDATE " + COLUMN_FAMILY_UNIQUE_TRAFFIC + DATA_SET_DEVICE +
" SET a= a + " + b + " WHERE x=" +
x + " AND y=" + y +
" AND z=" + x
println(query)
val KeySpace = new CassandraSQLContext(sparkContext)
KeySpace.setKeyspace(KEYSPACE)
hourUniqueKeySpace.sql(query)
When I execute this code, I get an error like this
Exception in thread "main" java.lang.RuntimeException: [1.1] failure: ``insert'' expected but identifier UPDATE found
Any idea why this is happening? How can I fix this?
source to share
UPDATE the table with a counter column is possible via the spark-cassandra connector. You will need to use DataFrames and DataFrameWriter to save the method using "append" mode (or SaveMode . Use if you like). Check out the DataFrameWriter.scala code .
For example, given the table:
cqlsh:test> SELECT * FROM name_counter ;
name | surname | count
---------+---------+-------
John | Smith | 100
Zhang | Wei | 1000
Angelos | Papas | 10
The code should look like this:
val updateRdd = sc.parallelize(Seq(Row("John", "Smith", 1L),
Row("Zhang", "Wei", 2L),
Row("Angelos", "Papas", 3L)))
val tblStruct = new StructType(
Array(StructField("name", StringType, nullable = false),
StructField("surname", StringType, nullable = false),
StructField("count", LongType, nullable = false)))
val updateDf = sqlContext.createDataFrame(updateRdd, tblStruct)
updateDf.write.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "test", "table" -> "name_counter"))
.mode("append")
.save()
After UPDATE:
name | surname | count
---------+---------+-------
John | Smith | 101
Zhang | Wei | 1002
Angelos | Papas | 13
Converting DataFrame can be easier, implicitly converting RDD to DataFrame : import sqlContext.implicits._
and using .toDF()
.
Check out the full code of this toy claim: https://github.com/kyrsideris/SparkUpdateCassandra/tree/master
Since the versions are very important here, the above requirements are for Scala 2.11.7, Spark 1.5.1, spark-cassandra-connector 1.5.0-RC1-s_2.11, Cassandra 3.0.5. DataFrameWriter is denoted as @Experimental
, since @since 1.4.0
.
source to share
I believe you cannot update natively via SPARK connector. See the doc:
"The default Spark Cassandra Connector behavior is to overwrite collections when inserted into a cassandra table. To override this behavior, you can provide a custom mapper with instructions on how you want the collection to be handled."
Thus, you will want to actually insert a new record with the existing key.
source to share