Python + Cassandra CqlPagingInputFormat + Hadoop Streaming

Introduction

I have a cassandra 1.2.19 cluster with hasoop 1.2.1 installed and configured in fully distributed mode on top of it (plus an additional non-cassandra node as master), everything works fine and I can start the map-change work on it.

Problem

Now I want to use haop streams to work with map / zoom out with python maps and reducers. The code for the cartographer is pretty simple, it reads each line from standard input, makes a "map", and produces some output for the reducer.

I run map / reduce with a command like this:

hadoop jar /usr/share/hadoop/contrib/streaming/hadoop-streaming-1.2.1.jar \
-D cassandra.input.keyspace="keyspace_name" \
-D cassandra.input.partitioner.class="Murmur3Partitioner" \
-D cassandra.input.columnfamily="cf_name" \
-D cassandra.consistencylevel.read="ONE" \
-D cassandra.input.widerows=true \
-D cassandra.input.thrift.address=XXX.XXX.XXX.XXX \
-inputformat org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat \
-input /keyspace_name/cf_name \
-output /dev/test \
-file mapper.py  -mapper mapper.py \
-file reducer.py -reducer reducer.py

      

Sample code for mapper.py as follows:

#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)

for line in sys.stdin:
    try:
        # remove leading and trailing whitespace
        line = line.strip()
        # IMPORTANT NOTE: since our tables are key value stores
        #   we are expecting only 2 columns on the input record
        key, value = line.split('\t')
        # do something with the input

      

Since the table I am using only contains 2 columns (this is a keyed store). I expect hadoop streaming will send delimited key-value pairs (by default it depends on the tabs), but instead it receives this:

{primary_key=java.nio.HeapByteBuffer[pos=97 lim=119 cap=1158]}  {value=java.nio.HeapByteBuffer[pos=139 lim=941 cap=1158]}
{primary_key=java.nio.HeapByteBuffer[pos=97 lim=119 cap=768]}   {value=java.nio.HeapByteBuffer[pos=139 lim=551 cap=768]}

      

This gives me references to HeapByteBuffers instead of actual values.

Questions

1-Is there a missing parameter when I map / reduce to tell cassandra / hadoop to give me the actual value as input for my mapper instead of this HeapByteBuffer? 2-Is there a way to access the actual value of this HeapByBuffers from python? 3-Is there a better / different way to run the haop on top of cassandra using mapper / reducers written in python?

Thanks in advance!

+3


source to share





All Articles