RDD only has the first column value: Hbase, PySpark
We are reading the Hbase table from Pyspark using the following commands.
from pyspark.sql.types import *
host=<Host Name>
port=<Port Number>
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
cmdata_conf = {"hbase.zookeeper.property.clientPort":port, "hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": "CMData", "hbase.mapreduce.scan.columns": "info:Tenure info:Age"}
cmdata_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=cmdata_conf)
output = cmdata_rdd.collect()
output
I am getting the result as shown below. (Key and age)
[(u'123', u'5'), (u'234', u'4'), (u'345', u'3'), (u'456', u'4'), (u'567', u'7'), (u'678', u'7'), (u'789', u'8')]
Instead, I expect Key, Tenure and Age. If I only have the Tenure column, then its the return key and the term. But if you add more columns, the result will always have a Key and Century column.
Can anyone help us solve this problem?
Note. We are not familiar with these tools.
Thanks in advance.
+3
source to share
1 answer
I'm prototyping and don't want to update your cluster, it might be helpful to have a look at happybase ( https://happybase.readthedocs.org/en/latest/ ).
The following code does the trick to get my small (9Gig) Hbase table 'name_Hbase_Table' from my cluster in less than a second.
import happybase
connection = happybase.Connection(host ='your.ip.cluster') #don't specify :port
table = connection.table('name_Hbase_Table')
def hbaseAccelerationParser(table): #create UDF to format data
finalTable=[]
for key, data in table.scan(): #don't need the key in my case
line=[]
for values in data.itervalues():
line.append(values)
finalTable.append(line)
return finalTable
table =table.map(hbaseAccelerationParser) #capture data in desired format
table = sc.parallelize(table ,4) #put in RDD
0
source to share