Spark RDD find by keyword

I have an RDD converted from HBase:

val hbaseRDD: RDD [(String, Array [String])] where tuple._1 is a string. and array are values ​​in HBase.

4929101-ACTIVE, ["4929101","2015-05-20 10:02:44","dummy1","dummy2"]
4929102-ACTIVE, ["4929102","2015-05-20 10:02:44","dummy1","dummy2"]
4929103-ACTIVE, ["4929103","2015-05-20 10:02:44","dummy1","dummy2"]

      

I also have SchemaRDD (id, date1, col1, col2, col3) converted to

val refDataRDD: RDD [(String, Array [String])], for which I will iterate over and check if it exists in hbaseRDD:

4929103, ["2015-05-21 10:03:44","EV01","col2","col3"]
4929104, ["2015-05-21 10:03:44","EV02","col2","col3"]

      

Question:

  • How can I check if the key (tuple._1) / ("4929103") exists in hbaseRDD and get the corresponding values ​​(tuple._2)? - I cannot use PairRDD search function inside rdd.filter, it throws "scala.MatchError: null" but works outside

    val filteredRDD = rdd.filter(sqlRow => {
      val hbaseLookup = hbaseRDD.lookup(sqlRow(0).toString + "-ACTIVE")
      // if found, check if date1 of hbaseRDD < sqlRow(1)
      // else if not found, retain row
      true
    })
    
          

    I'm not sure if this is the problem, although I'm also experiencing NPE when I switch the search line to:

    val sqlRowHbase = hbaseRDD.filter(row => {
    
          

    Note: before these lines, I do hbaseRDD.count. and hbaseRDD.lookup works fine outside of rdd.filter

So, basically, I'm trying to "find" a key in hbaseRDD and get the string / values. It's a bit tricky to join them as some of the values ​​in both RDDs can be null. And it depends on many scenarios which row will be saved with which data.

+3


source to share


1 answer


Assuming the set of a_id you need to look for is contained in the RDD, I think you could use leftOuterJoin instead of iterating and finding each value.

I saw your comment above regarding the potentially changeable position of date1. I do not cover it below, although I think it should be handled before the actual search by means of some kind of display of each line.

If I get the pseudocode correctly, you have an RDD (id, date)

and want to update it by looking at the data in hbase and updating the date if a row is found in hbase for that id and if its date was earlier than in refData. It is right?

If so, suppose you have some ref data like:

val refData = sc.parallelize(Array(
 ("4929103","2015-05-21 10:03:44"),
 ("4929104","2015-05-21 10:03:44")
))

      



And some line data from Hbase:

val hbaseRDD = sc.parallelize(Array(
    ("4929101-ACTIVE", Array("4929101","2015-05-20 10:02:44")),
    ("4929102-ACTIVE", Array("4929102","2015-05-20 10:02:44")),
    ("4929103-ACTIVE", Array("4929103","2015-05-20 10:02:44"))
))

      

Then you can search every id from refData in hbase with a simple leftOuterJoin and for every row found: update the date if necessary:

refData
  // looks up in Hbase all rows whose date1 a_id value matches the id in searchedIds
  .leftOuterJoin(hbaseRDD.map{ case (rowkey, Array(a_id, date1)) => (a_id, date1)})

  // update the date in refData if date from hBase is earlier
  .map { case (rowKey, (refDate, maybeRowDate)) => ( rowKey, chooseDate (refDate, maybeRowDate)) }
  .collect


def chooseDate(refDate: String, rowDate: Option[String]) =  rowDate match {

  // if row not found in Hbase: keep ref date
  case None => refDate

  case Some(rDate) => 
    if (true) /* replace this by first parsing the date, then check if rowDate < refDate */ 
        rowDate
    else
        refDate
}

      

0


source







All Articles