RDD only has the first column value: Hbase, PySpark

We are reading the Hbase table from Pyspark using the following commands.

from pyspark.sql.types import *
host=<Host Name>
port=<Port Number>

keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

cmdata_conf = {"hbase.zookeeper.property.clientPort":port, "hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": "CMData", "hbase.mapreduce.scan.columns": "info:Tenure info:Age"}

cmdata_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=cmdata_conf)

output = cmdata_rdd.collect()

output

      

I am getting the result as shown below. (Key and age)

[(u'123', u'5'), (u'234', u'4'), (u'345', u'3'), (u'456', u'4'), (u'567', u'7'), (u'678', u'7'), (u'789', u'8')]

      

Instead, I expect Key, Tenure and Age. If I only have the Tenure column, then its the return key and the term. But if you add more columns, the result will always have a Key and Century column.

Can anyone help us solve this problem?

Note. We are not familiar with these tools.

Thanks in advance.

+3


source to share


1 answer


I'm prototyping and don't want to update your cluster, it might be helpful to have a look at happybase ( https://happybase.readthedocs.org/en/latest/ ).

The following code does the trick to get my small (9Gig) Hbase table 'name_Hbase_Table' from my cluster in less than a second.



import happybase
connection = happybase.Connection(host ='your.ip.cluster') #don't specify :port
table = connection.table('name_Hbase_Table')
def hbaseAccelerationParser(table): #create UDF to format data
    finalTable=[]
    for key, data in table.scan(): #don't need the key in my case
        line=[]
        for values in data.itervalues():
            line.append(values)
        finalTable.append(line)
    return finalTable
table =table.map(hbaseAccelerationParser) #capture data in desired format
table = sc.parallelize(table ,4) #put in RDD

      

0


source







All Articles