-
-
Save zeev1079/cc6e9df486b74958ea327395c632eedb to your computer and use it in GitHub Desktop.
Vimeo's medium blog about "Too big to query: how to query HBase with minimal pain" , blog code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.hbase.client.Scan | |
import org.apache.hadoop.hbase.filter.MultiRowRangeFilter | |
import org.apache.hadoop.hbase.filter.MultiRowRangeFilter.RowRange | |
import org.apache.hadoop.hbase.protobuf.ProtobufUtil | |
import org.apache.hadoop.hbase.util.{Base64, Bytes} | |
import scala.collection.JavaConversions._ | |
object HBaseScanGenerator { | |
val convertScanToString = (scan: Scan) => Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray) | |
case class SCAN_RANGE(start: Seq[Any], end: Seq[Any]) | |
def createScans(number_buckets: Int = 100, scan_ranges: List[SCAN_RANGE]): List[Scan] = | |
scan_ranges.flatMap { range => | |
(0 until number_buckets).map { bucket => | |
val scan = new Scan | |
val rowKeyrowKeyGenerator = PhoneixRowKeyGenerator.generateRowKey(Some(bucket))(_) | |
scan.setStartRow(rowKeyrowKeyGenerator(range.start)) | |
scan.setStopRow(rowKeyrowKeyGenerator(range.end)) | |
scan | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import PhoneixSchemaGenerator.value_to_phoneix_convertor | |
object HBaseSnapQuery { | |
case class HbaseValues(rowkeyVal: Seq[String], values: Map[String, String]) | |
def scanPhoneixData(spark: SparkSession)(snapname: String, scan_ranges: List[SCAN_RANGE])(rdd_transformation: RDD[HbaseValues] => RDD[HbaseValues] = identity[RDD[HbaseValues]]): Dataset[HbaseValues] = { | |
import spark.sqlContext.implicits._ | |
val scans = createScans(number_buckets, scan_ranges) | |
scans.par.map { scan => | |
val hadoop_conf = HBaseConfiguration.create() | |
hadoop_conf.set("hbase.rootdir", root_dir) | |
hadoop_conf.set("hbase.zookeeper.quorum", zkq) | |
hadoop_conf.set("fs.defaultFS", s"hdfs://${nameServie}") | |
hadoop_conf.set("mapreduce.jobtracker.address", nameServie) | |
//... | |
hadoop_conf.set(TableInputFormat.SCAN, HBaseScanGenerator.convertScanToString(scan)) | |
val job = Job.getInstance(hadoop_conf) | |
TableSnapshotInputFormat.setInput(job, snapname, new Path(snap_dir)) | |
val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(job.getConfiguration, | |
classOf[TableSnapshotInputFormat], | |
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], | |
classOf[org.apache.hadoop.hbase.client.Result]) | |
hBaseRDD.map { row => | |
val HbaseQueryResult = row._2.listCells().asScala | |
val schema = PhoneixSchemaGenerator.getGenericSchema(scan_ranges.head.start: _*) | |
val rowKey: Seq[String] = PhoneixSchemaGenerator.getColumnValues(schema)(CellUtil.cloneRow(HbaseQueryResult.head).drop(1)) | |
val mp: Map[String, String] = HbaseQueryResult.map { cell => | |
val column = Bytes.toString(CellUtil.cloneQualifier(cell)) | |
val rowValue = PhoneixRowKeyGenerator.rowKeyValueCellTo(column, cell).toString | |
column -> rowValue | |
}.toMap | |
HbaseValues(rowKey, mp) | |
} | |
}.toList.foldLeft(rdd_transformation(spark.sparkContext.emptyRDD[HbaseValues])) { (acc, rdds) => | |
val rdd_joined_transformed = rdd_transformation(spark.sparkContext.union(rdds)) | |
val comb = acc.union(rdd_joined_transformed) | |
comb | |
}.toDF().as[HbaseValues] | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.phoenix.schema.types._ | |
import org.apache.phoenix.util.ByteUtil | |
import java.sql.Date | |
import scala.collection.mutable.ArrayBuffer | |
object PhoneixRowKeyGenerator { | |
def convertToByteArray(key: Any): Array[Byte] = key match { | |
case x: Long => PLong.INSTANCE.toBytes(x) | |
case x: Int => PInteger.INSTANCE.toBytes(x) | |
case x: Double => PDouble.INSTANCE.toBytes(x) | |
case x: Date => PDate.INSTANCE.toBytes(x) | |
case x: String => PVarchar.INSTANCE.toBytes(x) | |
case x: Char => PChar.INSTANCE.toBytes(x) | |
case _ => ByteUtil.EMPTY_BYTE_ARRAY | |
} | |
def generateRowKey(bucket: Option[Int] = None)(compositeKeys: Any*): Array[Byte] = { | |
val keys = compositeKeys.tail.foldLeft(ArrayBuffer[Array[Byte]](convertToByteArray(compositeKeys.head)))((byteArray, rowKeyColumn) => { | |
byteArray += convertToByteArray(rowKeyColumn) | |
}) | |
updateSaltingByte(bucket)(keys) | |
} | |
def updateSaltingByte(bucket: Option[Int] = None)(rowKey: ArrayBuffer[Array[Byte]]): Array[Byte] = { | |
bucket.map(salt => ByteUtil.concat(Array[Byte](salt.toByte), rowKey: _*)) | |
.getOrElse(ByteUtil.concat(rowKey.head, rowKey.tail: _*)) | |
} | |
def rowKeyValueCellTo(column: String, cell: Cell) = { | |
val bytes= CellUtil.cloneValue(cell) | |
column match { | |
case col if VarcharColumns.contains(col) => // for string cols | |
Bytes.toString(bytes) | |
case col if IntColumns.contains(col) =>// for Int cols | |
PInteger.INSTANCE.getCodec.decodeInt(bytes, 0, SortOrder.getDefault) | |
case col if DoubleColumns.contains(col) => | |
PDouble.INSTANCE.getCodec.decodeDouble(bytes, 0, SortOrder.getDefault) | |
//... | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.hbase.io.ImmutableBytesWritable | |
import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder | |
import org.apache.phoenix.schema.ValueSchema.Field | |
import org.apache.phoenix.schema.types._ | |
import org.apache.phoenix.schema.{RowKeySchema, SortOrder} | |
object PhoneixSchemaGenerator { | |
implicit def value_to_phoneix_convertor(arr: scala.collection.Seq[Any]): Seq[GenericPDatum] = arr.map { | |
case _: Long => GenericPDatum.pDatum(PLong.INSTANCE) | |
case _: Int => GenericPDatum.pDatum(PInteger.INSTANCE) | |
case _: Double => GenericPDatum.pDatum(PDouble.INSTANCE) | |
case _: Date => GenericPDatum.pDatum(PDate.INSTANCE) | |
case _: String => GenericPDatum.pDatum(PVarchar.INSTANCE) | |
case _: Char => GenericPDatum.pDatum(PChar.INSTANCE) | |
case _ => GenericPDatum.pDatum(PLong.INSTANCE) | |
} | |
def getGenericSchema(keys: GenericPDatum*) = { | |
val builder: RowKeySchemaBuilder = new RowKeySchemaBuilder(keys.length) | |
keys.foreach(key => builder.addField(key, false, SortOrder.getDefault)) | |
builder.rowKeyOrderOptimizable(false) | |
builder.build() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Can you please provide the GenericPDatum class