Performing bulk upload in cassandra with map reduction

I don't have much experience with cassandra, so please excuse me if I'm wrong.

I am trying to bulk load into cassandra with map minification

Basically a word count example

Link: http://henning.kropponline.de/2012/11/15/using-cassandra-hadoopbulkoutputformat/

I followed a simple Hadoop Wordcount Mapper example and modified the driver and reducer code slightly to match the example above.

I have also successfully generated the output file. Now I am in doubt how to do the upload to the cassandra part? Is there a difference in my approach?

Please advice.

This is part of the driver code

 Job job = new Job();
 job.setJobName(getClass().getName());
 job.setJarByClass(CassaWordCountJob.class);

 Configuration conf = job.getConfiguration();
 conf.set("cassandra.output.keyspace", "test");
 conf.set("cassandra.output.columnfamily", "words");
 conf.set("cassandra.output.partitioner.class", "org.apache.cassandra.dht.RandomPartitioner");
 conf.set("cassandra.output.thrift.port","9160");    // default
 conf.set("cassandra.output.thrift.address", "localhost");
 conf.set("mapreduce.output.bulkoutputformat.streamthrottlembits", "400");

 job.setMapperClass(CassaWordCountMapper.class);
 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(IntWritable.class);
 FileInputFormat.setInputPaths(job, new Path(args[0]));
 job.setReducerClass(CassaWordCountReducer.class);
 FileOutputFormat.setOutputPath(job, new Path("/home/user/Desktop/test/cassandra")); 
 MultipleOutputs.addNamedOutput(job, "reducer", BulkOutputFormat.class, ByteBuffer.class, List.class);
 return job.waitForCompletion(true) ? 0 : 1;

      

Mapper is the same as a normal wordcount mapper, which just symbolizes and emits Word,

The reducer class has the form

public class CassaWordCountReducer extends 
        Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        List<Mutation> columnsToAdd = new ArrayList<Mutation>();
        Integer wordCount = 0;
        for(IntWritable value : values) {
            wordCount += value.get();
        }
        Column countCol = new Column(ByteBuffer.wrap("count".getBytes()));
        countCol.setValue(ByteBuffer.wrap(wordCount.toString().getBytes()));
        countCol.setTimestamp(new Date().getTime());
        ColumnOrSuperColumn wordCosc = new ColumnOrSuperColumn();
        wordCosc.setColumn(countCol);
        Mutation countMut = new Mutation();
        countMut.column_or_supercolumn = wordCosc;
        columnsToAdd.add(countMut);
        context.write(ByteBuffer.wrap(key.toString().getBytes()), columnsToAdd);
    }
}

      

+3


source to share


1 answer


To do bulk uploads in Cassandra, I would suggest looking at this article from DataStax . Basically you need to do 2 things for bulk upload:

  • Your output won't go into Cassandra, you need to convert it to SSTables.
  • Once you have SSTables, you should be able to transfer them to Cassandra. Of course, you don't just want to copy every SSTable to every node, you only want to copy the corresponding piece of data to every node

In your case, when used, BulkOutputFormat

it has to do it all using sstableloader

behind the scenes. I've never used it since MultipleOutputs

, but it should work fine.



I think the mistake in your case is that you are not using it MultipleOutputs

correctly: you still do context.write

when you really need to write your object MultipleOutputs

. The way you are doing it right now, since you are writing in regular Context

, it will be picked up by the standard output format TextOutputFormat

, not the one you defined in MultipleOutputs

. More details on how to use MultipleOutputs

in your reducer here .

Once you write the correct output format BulkOutputFormat

as you have defined, your SSTables should be created and passed to Cassandra from every node in your cluster - you don't need an extra step, the output format will take care of it for you.

Also I would suggest looking at this post where they also explain how to use BulkOutputFormat

, but they use ConfigHelper

which you may want to look through to customize your Cassandra endpoint more easily.

+3


source







All Articles