Spark insert in HBase slow

I am embedding in HBase using Spark but slowly. It takes 2-3 minutes for 60,000 entries. I have about 10 million records left.

object WriteToHbase extends Serializable {
    def main(args: Array[String]) {
        val csvRows: RDD[Array[String] = ...
        val dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
        val usersRDD = csvRows.map(row => {
            new UserTable(row(0), row(1), row(2), row(9), row(10), row(11))
        })
        processUsers(sc: SparkContext, usersRDD, dateFormatter)
    })
}

def processUsers(sc: SparkContext, usersRDD: RDD[UserTable], dateFormatter: DateTimeFormatter): Unit = {

    usersRDD.foreachPartition(part => {
        val conf = HBaseConfiguration.create()
        val table = new HTable(conf, tablename)

        part.foreach(userRow => {
            val id = userRow.id
            val name = userRow.name
            val date1 = dateFormatter.parseDateTime(userRow.date1)
            val hRow = new Put(Bytes.toBytes(id))
            hRow.add(cf, q, Bytes.toBytes(date1))
            hRow.add(cf, q, Bytes.toBytes(name))
            ...
            table.put(hRow)
        })
        table.flushCommits()
        table.close()
    })
}

      

I am using this in spark-submit:

--num-executors 2 --driver-memory 2G --executor-memory 2G --executor-cores 2 

      

+3


source to share


3 answers


This is slow because the implementation does not use data affinity; a piece of Spark RDD on a server can be ported to an HBase RegionServer running on another server.



There is currently no Spark RRD operation to efficiently use the HBase datastore.

+3


source


There is a batch api in Htable, you can try to send request requests as 100-500 packets put.I think it might speed you up a bit It returns an individual result for each operation, so you can check for failed attempts if you like.

public void batch(List<? extends Row> actions, Object[] results)

      



https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/HTable.html#batch%28java.util.List,%20java.lang.Object:5%29

0


source


You need to take a look at an approach where you can distribute your incoming data in the Job Spark . In your current foreachPartition approach , you should instead be looking at transforms like map, mapToPair. You need to evaluate the entire DAG lifecycle and where you can save more time.

After that, based on the Parallelism achieved, you can call the saveAsNewAPIHadoopDataset Spark action to write inside HBase faster and in parallel. How:

JavaPairRDD<ImmutableBytesWritable, Put> yourFinalRDD = yourRDD.<SparkTransformation>{()};    
yourFinalRDD.saveAsNewAPIHadoopDataset(yourHBaseConfiguration); 

      

Note. Where yourHBaseConfiguration will be a singleton and will be the only object in the Executor node to share between tasks

Please let me know if this pseudocode doesn't work for you or doesn't find any difficulty on it.

0


source







All Articles