Spark RDD find by keyword
I have an RDD converted from HBase:
val hbaseRDD: RDD [(String, Array [String])] where tuple._1 is a string. and array are values ββin HBase.
4929101-ACTIVE, ["4929101","2015-05-20 10:02:44","dummy1","dummy2"]
4929102-ACTIVE, ["4929102","2015-05-20 10:02:44","dummy1","dummy2"]
4929103-ACTIVE, ["4929103","2015-05-20 10:02:44","dummy1","dummy2"]
I also have SchemaRDD (id, date1, col1, col2, col3) converted to
val refDataRDD: RDD [(String, Array [String])], for which I will iterate over and check if it exists in hbaseRDD:
4929103, ["2015-05-21 10:03:44","EV01","col2","col3"]
4929104, ["2015-05-21 10:03:44","EV02","col2","col3"]
Question:
-
How can I check if the key (tuple._1) / ("4929103") exists in hbaseRDD and get the corresponding values ββ(tuple._2)? - I cannot use PairRDD search function inside rdd.filter, it throws "scala.MatchError: null" but works outside
val filteredRDD = rdd.filter(sqlRow => { val hbaseLookup = hbaseRDD.lookup(sqlRow(0).toString + "-ACTIVE") // if found, check if date1 of hbaseRDD < sqlRow(1) // else if not found, retain row true })
I'm not sure if this is the problem, although I'm also experiencing NPE when I switch the search line to:
val sqlRowHbase = hbaseRDD.filter(row => {
Note: before these lines, I do hbaseRDD.count. and hbaseRDD.lookup works fine outside of rdd.filter
So, basically, I'm trying to "find" a key in hbaseRDD and get the string / values. It's a bit tricky to join them as some of the values ββin both RDDs can be null. And it depends on many scenarios which row will be saved with which data.
source to share
Assuming the set of a_id you need to look for is contained in the RDD, I think you could use leftOuterJoin instead of iterating and finding each value.
I saw your comment above regarding the potentially changeable position of date1. I do not cover it below, although I think it should be handled before the actual search by means of some kind of display of each line.
If I get the pseudocode correctly, you have an RDD (id, date)
and want to update it by looking at the data in hbase and updating the date if a row is found in hbase for that id and if its date was earlier than in refData. It is right?
If so, suppose you have some ref data like:
val refData = sc.parallelize(Array(
("4929103","2015-05-21 10:03:44"),
("4929104","2015-05-21 10:03:44")
))
And some line data from Hbase:
val hbaseRDD = sc.parallelize(Array(
("4929101-ACTIVE", Array("4929101","2015-05-20 10:02:44")),
("4929102-ACTIVE", Array("4929102","2015-05-20 10:02:44")),
("4929103-ACTIVE", Array("4929103","2015-05-20 10:02:44"))
))
Then you can search every id from refData in hbase with a simple leftOuterJoin and for every row found: update the date if necessary:
refData
// looks up in Hbase all rows whose date1 a_id value matches the id in searchedIds
.leftOuterJoin(hbaseRDD.map{ case (rowkey, Array(a_id, date1)) => (a_id, date1)})
// update the date in refData if date from hBase is earlier
.map { case (rowKey, (refDate, maybeRowDate)) => ( rowKey, chooseDate (refDate, maybeRowDate)) }
.collect
def chooseDate(refDate: String, rowDate: Option[String]) = rowDate match {
// if row not found in Hbase: keep ref date
case None => refDate
case Some(rDate) =>
if (true) /* replace this by first parsing the date, then check if rowDate < refDate */
rowDate
else
refDate
}
source to share