Skip to content

Instantly share code, notes, and snippets.

@travishegner
Created June 15, 2018 13:22
Show Gist options
  • Save travishegner/12cf5542881ca8fac6377cc56c7a7ea1 to your computer and use it in GitHub Desktop.
Save travishegner/12cf5542881ca8fac6377cc56c7a7ea1 to your computer and use it in GitHub Desktop.
HBase scala object for writing and deleting from spark
package com.trilliumstaffing.hadoop.tools
import java.nio.ByteBuffer
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.spark.sql.Dataset
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._
object HBase {
object implicits {
implicit def scanToString(v: Scan): String = {
val proto = ProtobufUtil.toScan(v)
Base64.encodeBytes(proto.toByteArray)
}
case class HBaseData(bytes: Array[Byte]) {
implicit def toBigDecimal: BigDecimal = Bytes.toBigDecimal(bytes)
implicit def toBoolean: Boolean = Bytes.toBoolean(bytes)
implicit def toDouble: Double = Bytes.toDouble(bytes)
implicit def toFloat: Float = Bytes.toFloat(bytes)
implicit def toInt: Int = Bytes.toInt(bytes)
implicit def toLong: Long = Bytes.toLong(bytes)
implicit def toShort: Short = Bytes.toShort(bytes)
implicit def toVector: Vector = {
val dbuf = java.nio.DoubleBuffer.allocate(bytes.length / 8)
dbuf.put(java.nio.ByteBuffer.wrap(bytes).asDoubleBuffer)
Vectors.dense(dbuf.array())
}
implicit def hbdToString: String = Bytes.toString(bytes)
}
implicit def toBA[T](v: T)(implicit f: T => HBaseData): Array[Byte] = v.bytes
implicit def toHbd(v: Array[Byte]): HBaseData = HBaseData(v)
implicit def toHbd(v: BigDecimal): HBaseData = HBaseData(Bytes.toBytes(v.bigDecimal))
implicit def toHbd(v: Boolean): HBaseData = HBaseData(Bytes.toBytes(v))
implicit def toHbd(v: ByteBuffer): HBaseData = HBaseData(Bytes.toBytes(v))
implicit def toHbd(v: Double): HBaseData = HBaseData(Bytes.toBytes(v))
implicit def toHbd(v: Float): HBaseData = HBaseData(Bytes.toBytes(v))
implicit def toHbd(v: Int): HBaseData = HBaseData(Bytes.toBytes(v))
implicit def toHbd(v: Long): HBaseData = HBaseData(Bytes.toBytes(v))
implicit def toHbd(v: Short): HBaseData = HBaseData(Bytes.toBytes(v))
implicit def toHbd(v: String): HBaseData = HBaseData(Bytes.toBytes(v))
implicit def toHbd(v: Vector): HBaseData = {
val bb = ByteBuffer.allocate(8 * v.size)
v.toArray.foreach(bb.putDouble)
HBaseData(bb.array())
}
}
import implicits._
private var hbase: Option[Connection] = None
def getHBase: Connection = {
hbase match {
case Some(h) => h
case None =>
val conf = HBaseConfiguration.create()
val hb = ConnectionFactory.createConnection(conf)
hbase = Option(hb)
hb
}
}
def write[TK, TF, TQ, TV](
tableName: String,
ds: Dataset[(TK, Map[TF, Map[TQ, TV]])],
batch: Int = 1000
)(implicit
fk: TK => HBaseData,
ff: TF => HBaseData,
fq: TQ => HBaseData,
fv: TV => HBaseData
): Unit = {
ds.foreachPartition(p => {
val hbase = HBase.getHBase
val table = hbase.getTable(TableName.valueOf(tableName))
val puts = ArrayBuffer[Put]()
p.foreach(r => {
val put = new Put(r._1)
r._2.foreach( f => {
f._2.foreach( q => {
put.addColumn(f._1, q._1, q._2)
})
})
puts += put
if (puts.length >= batch) {
table.put(puts.asJava)
puts.clear()
}
})
if (puts.nonEmpty) {
table.put(puts.asJava)
puts.clear()
}
table.close()
})
}
//CAUTION: if CF is empty, it will delete the whole row
def delete(
tableName: String,
ds: Dataset[HBaseData],
cf: List[HBaseData],
batch: Int = 1000
): Unit = {
val aRows = Accumulators.getLongAccumulator("HBase Rows Deleted")
val aBatches = Accumulators.getLongAccumulator("HBase Delete Batches Submitted")
ds.foreachPartition(p => {
val hbase = HBase.getHBase
val table = hbase.getTable(TableName.valueOf(tableName))
val dels = ArrayBuffer[Delete]()
p.foreach(r => {
val del = new Delete(r)
cf.foreach(f => del.addFamily(f))
dels += del
aRows.add(1L)
if (dels.length >= batch) {
aBatches.add(1L)
table.delete(dels.asJava)
dels.clear()
}
})
if (dels.nonEmpty) {
aBatches.add(1L)
table.delete(dels.asJava)
dels.clear()
}
table.close()
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment