Skip to content

Instantly share code, notes, and snippets.

@zeev1079
Last active April 26, 2024 05:47
Show Gist options
  • Save zeev1079/cc6e9df486b74958ea327395c632eedb to your computer and use it in GitHub Desktop.
Save zeev1079/cc6e9df486b74958ea327395c632eedb to your computer and use it in GitHub Desktop.
Vimeo's medium blog about "Too big to query: how to query HBase with minimal pain" , blog code
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.MultiRowRangeFilter
import org.apache.hadoop.hbase.filter.MultiRowRangeFilter.RowRange
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import scala.collection.JavaConversions._
object HBaseScanGenerator {
val convertScanToString = (scan: Scan) => Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray)
case class SCAN_RANGE(start: Seq[Any], end: Seq[Any])
def createScans(number_buckets: Int = 100, scan_ranges: List[SCAN_RANGE]): List[Scan] =
scan_ranges.flatMap { range =>
(0 until number_buckets).map { bucket =>
val scan = new Scan
val rowKeyrowKeyGenerator = PhoneixRowKeyGenerator.generateRowKey(Some(bucket))(_)
scan.setStartRow(rowKeyrowKeyGenerator(range.start))
scan.setStopRow(rowKeyrowKeyGenerator(range.end))
scan
}
}
}
import PhoneixSchemaGenerator.value_to_phoneix_convertor
object HBaseSnapQuery {
case class HbaseValues(rowkeyVal: Seq[String], values: Map[String, String])
def scanPhoneixData(spark: SparkSession)(snapname: String, scan_ranges: List[SCAN_RANGE])(rdd_transformation: RDD[HbaseValues] => RDD[HbaseValues] = identity[RDD[HbaseValues]]): Dataset[HbaseValues] = {
import spark.sqlContext.implicits._
val scans = createScans(number_buckets, scan_ranges)
scans.par.map { scan =>
val hadoop_conf = HBaseConfiguration.create()
hadoop_conf.set("hbase.rootdir", root_dir)
hadoop_conf.set("hbase.zookeeper.quorum", zkq)
hadoop_conf.set("fs.defaultFS", s"hdfs://${nameServie}")
hadoop_conf.set("mapreduce.jobtracker.address", nameServie)
//...
hadoop_conf.set(TableInputFormat.SCAN, HBaseScanGenerator.convertScanToString(scan))
val job = Job.getInstance(hadoop_conf)
TableSnapshotInputFormat.setInput(job, snapname, new Path(snap_dir))
val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(job.getConfiguration,
classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.map { row =>
val HbaseQueryResult = row._2.listCells().asScala
val schema = PhoneixSchemaGenerator.getGenericSchema(scan_ranges.head.start: _*)
val rowKey: Seq[String] = PhoneixSchemaGenerator.getColumnValues(schema)(CellUtil.cloneRow(HbaseQueryResult.head).drop(1))
val mp: Map[String, String] = HbaseQueryResult.map { cell =>
val column = Bytes.toString(CellUtil.cloneQualifier(cell))
val rowValue = PhoneixRowKeyGenerator.rowKeyValueCellTo(column, cell).toString
column -> rowValue
}.toMap
HbaseValues(rowKey, mp)
}
}.toList.foldLeft(rdd_transformation(spark.sparkContext.emptyRDD[HbaseValues])) { (acc, rdds) =>
val rdd_joined_transformed = rdd_transformation(spark.sparkContext.union(rdds))
val comb = acc.union(rdd_joined_transformed)
comb
}.toDF().as[HbaseValues]
}
}
import org.apache.phoenix.schema.types._
import org.apache.phoenix.util.ByteUtil
import java.sql.Date
import scala.collection.mutable.ArrayBuffer
object PhoneixRowKeyGenerator {
def convertToByteArray(key: Any): Array[Byte] = key match {
case x: Long => PLong.INSTANCE.toBytes(x)
case x: Int => PInteger.INSTANCE.toBytes(x)
case x: Double => PDouble.INSTANCE.toBytes(x)
case x: Date => PDate.INSTANCE.toBytes(x)
case x: String => PVarchar.INSTANCE.toBytes(x)
case x: Char => PChar.INSTANCE.toBytes(x)
case _ => ByteUtil.EMPTY_BYTE_ARRAY
}
def generateRowKey(bucket: Option[Int] = None)(compositeKeys: Any*): Array[Byte] = {
val keys = compositeKeys.tail.foldLeft(ArrayBuffer[Array[Byte]](convertToByteArray(compositeKeys.head)))((byteArray, rowKeyColumn) => {
byteArray += convertToByteArray(rowKeyColumn)
})
updateSaltingByte(bucket)(keys)
}
def updateSaltingByte(bucket: Option[Int] = None)(rowKey: ArrayBuffer[Array[Byte]]): Array[Byte] = {
bucket.map(salt => ByteUtil.concat(Array[Byte](salt.toByte), rowKey: _*))
.getOrElse(ByteUtil.concat(rowKey.head, rowKey.tail: _*))
}
def rowKeyValueCellTo(column: String, cell: Cell) = {
val bytes= CellUtil.cloneValue(cell)
column match {
case col if VarcharColumns.contains(col) => // for string cols
Bytes.toString(bytes)
case col if IntColumns.contains(col) =>// for Int cols
PInteger.INSTANCE.getCodec.decodeInt(bytes, 0, SortOrder.getDefault)
case col if DoubleColumns.contains(col) =>
PDouble.INSTANCE.getCodec.decodeDouble(bytes, 0, SortOrder.getDefault)
//...
}
}
}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder
import org.apache.phoenix.schema.ValueSchema.Field
import org.apache.phoenix.schema.types._
import org.apache.phoenix.schema.{RowKeySchema, SortOrder}
object PhoneixSchemaGenerator {
implicit def value_to_phoneix_convertor(arr: scala.collection.Seq[Any]): Seq[GenericPDatum] = arr.map {
case _: Long => GenericPDatum.pDatum(PLong.INSTANCE)
case _: Int => GenericPDatum.pDatum(PInteger.INSTANCE)
case _: Double => GenericPDatum.pDatum(PDouble.INSTANCE)
case _: Date => GenericPDatum.pDatum(PDate.INSTANCE)
case _: String => GenericPDatum.pDatum(PVarchar.INSTANCE)
case _: Char => GenericPDatum.pDatum(PChar.INSTANCE)
case _ => GenericPDatum.pDatum(PLong.INSTANCE)
}
def getGenericSchema(keys: GenericPDatum*) = {
val builder: RowKeySchemaBuilder = new RowKeySchemaBuilder(keys.length)
keys.foreach(key => builder.addField(key, false, SortOrder.getDefault))
builder.rowKeyOrderOptimizable(false)
builder.build()
}
}
@LeKiet258
Copy link

Can you please provide the GenericPDatum class

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment