Kafka spark data is not received in cassandra. inserted null lines

When writing data to cassandra from spark, no data is written.
Flash back:
I am doing the kafka-sparkStreaming-cassandra integration.
I am reading kafka posts and trying to put it into cassandra table CREATE TABLE TEST_TABLE(key INT PRIMARY KEY, value TEXT)

.
Kafka for sparking works cool, but spark for cassandra, there are some problems ... the data is not getting to the table. I can create a connection with cassandra, but no data is inserted into the cassandra table. The output shows that it connects and the next second one disconnects.
The lines for are System.out.print()

displayed in the output.

+++++++++++cassandra connector created++++++++++++++++++++++++++++
+++++++++++++streaming Connection done!+++++++++++++++++++++++++++
++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++

      

Cassandra shell shows 0 lines.
complete code and logs and dependencies are below:

public class SparkStream {
    static int key=0;
    public static void main(String args[]) throws Exception
    {

        if(args.length != 3)
        {
            System.out.println("parameters not given properly");
            System.exit(1);
        }

        Logger.getLogger("org").setLevel(Level.OFF);
        Logger.getLogger("akka").setLevel(Level.OFF);
        Map<String,Integer> topicMap = new HashMap<String,Integer>();
        String[] topic = args[2].split(",");
        for(String t: topic)
        {
            topicMap.put(t, new Integer(3));
        }

        /* Connection to Spark */
        SparkConf conf = new SparkConf();
        conf.set("spark.cassandra.connection.host", "localhost");
        JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
        JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000));


        /* connection to cassandra */
        CassandraConnector connector = CassandraConnector.apply(sc.getConf());
        System.out.println("+++++++++++cassandra connector created++++++++++++++++++++++++++++");


        /* Receive Kafka streaming inputs */
        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
        System.out.println("+++++++++++++streaming Connection done!+++++++++++++++++++++++++++");


        /* Create DStream */                
        JavaDStream<TestTable> data = messages.map(new Function< Tuple2<String,String>, TestTable >() 
        {
            public TestTable call(Tuple2<String, String> message)
            {
                return new TestTable(new Integer(++key), message._2() );
            }
        }
        );
        System.out.println("++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++");


        /* Write to cassandra */
        javaFunctions(data).writerBuilder("testkeyspace", "test_table", mapToRow(TestTable.class)).saveToCassandra();


        jssc.start();
        jssc.awaitTermination();

    }
}

class TestTable implements Serializable
{
    Integer key;
    String value;

    public TestTable() {}

    public TestTable(Integer k, String v)
    {
        key=k;
        value=v;
    }

    public Integer getKey(){
        return key;
    }

    public void setKey(Integer k){
        key=k;
    }

    public String getValue(){
        return value;
    }

    public void setValue(String v){
        value=v;
    }

    public String toString(){
        return MessageFormat.format("TestTable'{'key={0}, value={1}'}'", key, value);

    }
}

      

Magazine:

+++++++++++cassandra connector created++++++++++++++++++++++++++++
+++++++++++++streaming Connection done!+++++++++++++++++++++++++++
++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++
14/12/09 12:07:33 INFO core.Cluster: New Cassandra host localhost/127.0.0.1:9042 added
14/12/09 12:07:33 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster
14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:34 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster

14/12/09 12:07:45 INFO core.Cluster: New Cassandra host localhost/127.0.0.1:9042 added
14/12/09 12:07:45 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster
14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:46 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster

      

POM.xml dependencies:

   <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.1.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.1.0</version>
    </dependency>

<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.10</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector-java_2.10</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.1.1</version>
</dependency>


    <dependency>
        <groupId>com.msiops.footing</groupId>
        <artifactId>footing-tuple</artifactId>
        <version>0.2</version>
    </dependency>   

<dependency>
    <groupId>com.datastax.cassandra</groupId>
    <artifactId>cassandra-driver-core</artifactId>
    <version>2.1.3</version>
</dependency>

      

Is there something wrong with the code? or cassandra config?

0


source to share


1 answer


solved a problem. columnMapper was unable to access the getters and setters of the TestTable class. So changed the access modifier to public. but now I had 2 public classes in one file. which is a mistake. so created another Java file TestTable.java with class like

public class TestTable implements Serializable { 
//code
}

      



now messages are read from kafka and stored in cassandra table

+1


source







All Articles