Sparking: how to output streaming data in cassandra

I am reading kafka message streams using sparking. Now I want to install Cassandra as output. I created a table in cassandra "test_table" with columns "key: text primary key" and "value: text" I have successfully mapped the data in the JavaDStream<Tuple2<String,String>> data

following way:

JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000));

JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
JavaDStream<Tuple2<String,String>> data = messages.map(new Function< Tuple2<String,String>, Tuple2<String,String> >() 
{
    public Tuple2<String,String> call(Tuple2<String, String> message)
    {
        return new Tuple2<String,String>( message._1(), message._2() );
    }
}
);  

      

Then I created a list:

List<TestTable> list = new ArrayList<TestTable>();

      

where TestTable is my custom class, which has the same structure as my Cassandra table, with key and value elements:

class TestTable
{
    String key;
    String val;

    public TestTable() {}

    public TestTable(String k, String v)
    {
        key=k;
        val=v;
    }

    public String getKey(){
        return key;
    }

    public void setKey(String k){
        key=k;
    }

    public String getVal(){
        return val;
    }

    public void setVal(String v){
        val=v;
    }

    public String toString(){
        return "Key:"+key+",Val:"+val;
    }
}

      

Please suggest a way to add data from JavaDStream<Tuple2<String,String>> data

to List<TestTable> list

. I do this so that I can later use

JavaRDD<TestTable> rdd = sc.parallelize(list); 
javaFunctions(rdd, TestTable.class).saveToCassandra("testkeyspace", "test_table"); 

      

to store the RDD data in Cassandra.

I've tried coding like this:

messages.foreachRDD(new Function<Tuple2<String,String>, String>()
                        {
                            public List<TestTable> call(Tuple2<String,String> message)
                            {
                                String k = message._1();
                                String v = message._2();
                                TestTable tbl = new TestTable(k,v);
                                list.put(tbl);
                            }
                        }
                    );

      

but there seems to be some error happening. Please, help.

+3


source to share


1 answer


Assuming the intent of this program is to save streaming data from kafka to Cassandra, there is no need to dump the data JavaDStream<Tuple2<String,String>>

into a list List<TestTable>

.

DataStax's Spark-Cassandra connector supports this feature directly through Spark Streaming extensions .

It is enough to use such extensions on JavaDStream

:



javaFunctions(data).writerBuilder("testkeyspace", "test_table", mapToRow(TestTable.class)).saveToCassandra();

      

instead of draining data in the intermediate list.

+6


source







All Articles