Spark streaming: HBase connection is closed when using hbaseMapPartitions

in my Spark streaming app I am using HBaseContext to put some values ​​in HBase, one input operation for each processed message.

If I use hbaseForeachPartitions everything is fine.

 dStream
  .hbaseForeachPartition(
    hbaseContext,
    (iterator, connection) => {
      val table = connection.getTable("namespace:table")
      // putHBase is external function in the same Scala object
      val results = iterator.flatMap(packet => putHBaseAndOther(packet))
      table.close()
      results
    }
 )

      

Instead of hbaseMapPartitions, the HBase connection is closed.

 dStream
  .hbaseMapPartition(
    hbaseContext,
    (iterator, connection) => {
      val table = connection.getTable("namespace:table")
      // putHBase is external function in the same Scala object
      val results = iterator.flatMap(packet => putHBaseAndOther(packet))
      table.close()
      results
    }
 )

      

Can anyone please explain to me why?

+3


source to share





All Articles