WordCount with Apache Crunch to HBase Standalone

I am currently evaluating Apache Crunch. I followed a simple WordCount MapReduce example : Afterwards I try to save the results in offline HBase. HBase works (verified with jps and HBase wrapper) as described here: http://hbase.apache.org/book/quickstart.html

I am now using an example to write to HBase:

Pipeline pipeline = new MRPipeline(WordCount.class,getConf());
PCollection<String> lines = pipeline.readTextFile(inputPath);
PTable<String,Long> counts = noStopWords.count();
pipeline.write(counts, new HBaseTarget("wordCountOutTable");
PipelineResult result = pipeline.done();

      

I am getting an exception: "exception: java.lang.illegalArgumentException: HBaseTarget only supports Put and Delete"

Any clues what went wrong?

+3


source to share


1 answer


PTable can be PCollection, but HBaseTarget can only handle Put or Delete objects. So you need to convert PTable to PCollection where each item in the collection is either Put or Delete. Check out Crunch-Examples for how to do this.

An example conversion might look like this:



 public PCollection<Put> createPut(final PTable<String, String> counts) {
   return counts.parallelDo("Convert to puts", new DoFn<Pair<String, String>, Put>() {
     @Override
     public void process(final Pair<String, String> input, final Emitter<Put> emitter) {
       Put put;
       // input.first is used as row key
       put = new Put(Bytes.toBytes(input.first())); 
       // the value (input.second) is added with its family and qualifier
       put.add(COLUMN_FAMILY_TARGET, COLUMN_QUALIFIER_TARGET_TEXT, Bytes.toBytes(input.second())); 
       emitter.emit(put);
     }
   }, Writables.writables(Put.class));
 }

      

+3


source







All Articles