Get data from DB for each row of DataFrame Pyspark

I am using Pyspark Dataframe API in streaming context, I have converted RDD to DF foreach DStream in my spark stream (I am using kafka receiver), this is what I did in my process. RDD function

rowRdd = data_lined_parameters.map(
        lambda x: Row(SYS=x[0], METRIC='temp', SEN=x[1], OCCURENCE=x[2], THRESHOLD_HIGH=x[3], OSH=x[4], OSM=x[5], OEH=x[6], OEM=x[7],OSD=x[8],OED=x[9],REMOVE_HOLIDAYS=x[10],TS=x[11],VALUE=x[12],DAY=x[13],WEEKDAY=x[14],HOLIDAY=x[15]))
rawDataDF = sqlContext.createDataFrame(rowRdd)

rawDataRequirementsCheckedDF = rawDataDF.filter("WEEKDAY <= OED AND WEEKDAY >=OSD AND HOLIDAY = false  VALUE > THRESHOLD_HIGH  ")

      

My next step is to enrich each row in my rawDataRequirementsCheckedDF with new columns from the hbase table, my question is the most efficient way to get data from hbase (phoenix) and link it to my original dataframe:

--------------------+-------+------+---------+---+---+---+---+---+---+---------------+---+----------------+--------------+--------------------+-------+-------+
|                 DAY|HOLIDAY|METRIC|OCCURENCE|OED|OEH|OEM|OSD|OSH|OSM|REMOVE_HOLIDAYS|SEN|             SYS|THRESHOLD_HIGH|                  TS|  VALUE|WEEKDAY|
+--------------------+-------+------+---------+---+---+---+---+---+---+---------------+---+----------------+--------------+--------------------+-------+-------+
|2017-08-03 00:00:...|  false|  temp|        3|  4| 19| 59|  0|  8|  0|           TRUE|  1|0201|            26|2017-08-03 16:22:...|28.4375|      3|
|2017-08-03 00:00:...|  false|  temp|        3|  4| 19| 59|  0|  8|  0|           TRUE|  1|0201|            26|2017-08-03 16:22:...|29.4375|      3|
+--------------------+-------+------+---------+---+---+---+---+---+---+---------------+---+----------------+--------------+--------------------+-------+-------+

      

The primary keys of the hbase table are DAY, SYS, SEN, so this will cause the data frame to have the same format.

EDIT:

This is what I have tried so far:

sysList = rawDataRequirementsCheckedDF.map(lambda x : "'"+x['SYS']+"'").collect()
df_sensor = sqlContext.read.format("jdbc").option("dbtable","(select DATE,SYSTEMUID,SENSORUID,OCCURENCE from ANOMALY where SYSTEMUID in ("+','.join(sysList)+") )").option("url", "jdbc:phoenix:clustdev1:2181:/hbase-unsecure").option("driver", "org.apache.phoenix.jdbc.PhoenixDriver").load()
df_anomaly = rawDataRequirementsCheckedDF.join(df_sensor, col("SYS") == col("SYSTEMUID"), 'outer')

      

+3


source to share


1 answer


In a simple way, I bring data from HBase by creating a table in phoenix, and then loading it into spark. It's in the Apache Spark Plugin section of the Apache Phoenix page.

df = sqlContext.read \
.format("org.apache.phoenix.spark") \
.option("table", "TABLE1") \
.option("zkUrl", "localhost:2181") \
.load()

      



Apache Spark Plugin link: https://phoenix.apache.org/phoenix_spark.html

+1


source







All Articles