Spark insert in HBase slow
I am embedding in HBase using Spark but slowly. It takes 2-3 minutes for 60,000 entries. I have about 10 million records left.
object WriteToHbase extends Serializable {
def main(args: Array[String]) {
val csvRows: RDD[Array[String] = ...
val dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
val usersRDD = csvRows.map(row => {
new UserTable(row(0), row(1), row(2), row(9), row(10), row(11))
})
processUsers(sc: SparkContext, usersRDD, dateFormatter)
})
}
def processUsers(sc: SparkContext, usersRDD: RDD[UserTable], dateFormatter: DateTimeFormatter): Unit = {
usersRDD.foreachPartition(part => {
val conf = HBaseConfiguration.create()
val table = new HTable(conf, tablename)
part.foreach(userRow => {
val id = userRow.id
val name = userRow.name
val date1 = dateFormatter.parseDateTime(userRow.date1)
val hRow = new Put(Bytes.toBytes(id))
hRow.add(cf, q, Bytes.toBytes(date1))
hRow.add(cf, q, Bytes.toBytes(name))
...
table.put(hRow)
})
table.flushCommits()
table.close()
})
}
I am using this in spark-submit:
--num-executors 2 --driver-memory 2G --executor-memory 2G --executor-cores 2
source to share
There is a batch api in Htable, you can try to send request requests as 100-500 packets put.I think it might speed you up a bit It returns an individual result for each operation, so you can check for failed attempts if you like.
public void batch(List<? extends Row> actions, Object[] results)
source to share
You need to take a look at an approach where you can distribute your incoming data in the Job Spark . In your current foreachPartition approach , you should instead be looking at transforms like map, mapToPair. You need to evaluate the entire DAG lifecycle and where you can save more time.
After that, based on the Parallelism achieved, you can call the saveAsNewAPIHadoopDataset Spark action to write inside HBase faster and in parallel. How:
JavaPairRDD<ImmutableBytesWritable, Put> yourFinalRDD = yourRDD.<SparkTransformation>{()};
yourFinalRDD.saveAsNewAPIHadoopDataset(yourHBaseConfiguration);
Note. Where yourHBaseConfiguration will be a singleton and will be the only object in the Executor node to share between tasks
Please let me know if this pseudocode doesn't work for you or doesn't find any difficulty on it.
source to share