Sparking: how to output streaming data in cassandra
I am reading kafka message streams using sparking. Now I want to install Cassandra as output. I created a table in cassandra "test_table" with columns "key: text primary key" and "value: text" I have successfully mapped the data in the JavaDStream<Tuple2<String,String>> data
following way:
JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000));
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
JavaDStream<Tuple2<String,String>> data = messages.map(new Function< Tuple2<String,String>, Tuple2<String,String> >()
{
public Tuple2<String,String> call(Tuple2<String, String> message)
{
return new Tuple2<String,String>( message._1(), message._2() );
}
}
);
Then I created a list:
List<TestTable> list = new ArrayList<TestTable>();
where TestTable is my custom class, which has the same structure as my Cassandra table, with key and value elements:
class TestTable
{
String key;
String val;
public TestTable() {}
public TestTable(String k, String v)
{
key=k;
val=v;
}
public String getKey(){
return key;
}
public void setKey(String k){
key=k;
}
public String getVal(){
return val;
}
public void setVal(String v){
val=v;
}
public String toString(){
return "Key:"+key+",Val:"+val;
}
}
Please suggest a way to add data from JavaDStream<Tuple2<String,String>> data
to List<TestTable> list
. I do this so that I can later use
JavaRDD<TestTable> rdd = sc.parallelize(list);
javaFunctions(rdd, TestTable.class).saveToCassandra("testkeyspace", "test_table");
to store the RDD data in Cassandra.
I've tried coding like this:
messages.foreachRDD(new Function<Tuple2<String,String>, String>()
{
public List<TestTable> call(Tuple2<String,String> message)
{
String k = message._1();
String v = message._2();
TestTable tbl = new TestTable(k,v);
list.put(tbl);
}
}
);
but there seems to be some error happening. Please, help.
source to share
Assuming the intent of this program is to save streaming data from kafka to Cassandra, there is no need to dump the data JavaDStream<Tuple2<String,String>>
into a list List<TestTable>
.
DataStax's Spark-Cassandra connector supports this feature directly through Spark Streaming extensions .
It is enough to use such extensions on JavaDStream
:
javaFunctions(data).writerBuilder("testkeyspace", "test_table", mapToRow(TestTable.class)).saveToCassandra();
instead of draining data in the intermediate list.
source to share