Skip to content

Instantly share code, notes, and snippets.

@oluies
Last active December 18, 2017 09:26
Show Gist options
  • Save oluies/c68068b8651e5cb081eb87fa0bf5458c to your computer and use it in GitHub Desktop.
Save oluies/c68068b8651e5cb081eb87fa0bf5458c to your computer and use it in GitHub Desktop.
HBase extract kv._2 -> map( column family name, map ( column qualifier name, value ) )
import java.time.Instant
import java.{lang, util}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{Cell, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.util.Bytes.toBytes
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
import org.slf4j.LoggerFactory
case class StatsCount(year: Int, month: Int, day: Int, dimension: String, dimensionkey: String)
object StatsCount {
private val log = LoggerFactory.getLogger(getClass)
def loadDs(start: Instant, end: Instant, conf: Configuration,curdim: String)( implicit spark: SparkSession) : Dataset[StatsCount] = {
val sc: RDD[StatsCount] = getStatsCountData(start,end,conf,curdim: String)
import spark.implicits._
sc.toDS()
}
type HbaseResultRDD = RDD[(ImmutableBytesWritable, Result)]
type HBaseRowNoVersionMap = java.util.NavigableMap[Array[Byte], java.util.NavigableMap[Array[Byte], Array[Byte]]]
type NBILinageCountRow = Map[Array[Byte], Map[Array[Byte], Array[Byte]]]
type NBILinageCountRowStrInt = scala.collection.immutable.Map[String, Map[String, BigInt]]
def navMapToMap(navMap: HBaseRowNoVersionMap): NBILinageCountRow = {
import scala.collection.JavaConverters._
navMap.asScala.toMap.map(cf =>
(
cf._1,
cf._2.asScala.toMap.map(col =>
(col._1, col._2)
)
)
)
}
def rowToStrMap(navMap: NBILinageCountRow): NBILinageCountRowStrInt = {
navMap.map(cf =>
(
Bytes.toString(cf._1),
cf._2.map(col =>
(Bytes.toString(col._1), BigInt(col._2))
)
)
)
}
private[hbase] def getStatsCountData(start: Instant, end: Instant, conf: Configuration, curdim: String)( implicit spark: SparkSession) = {
val ns = "bigindex"
val dimensionTable: String = s"${ns}.stats_${curdim}_count"
val c = HBaseConfiguration.create(conf)
createHBaseScan(start, end, c)
c.set(TableInputFormat.INPUT_TABLE, dimensionTable)
val results: HbaseResultRDD = spark.sparkContext.newAPIHadoopRDD(c,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
var mapredHBase: RDD[(String, NBILinageCountRowStrInt)] = results.map(kv =>
(kv._1.get(), navMapToMap(kv._2.getNoVersionMap())) // NoVersion - means we only get the latest version
).map(kv =>
(kv._1, rowToStrMap(kv._2))
).map(kv => {
/*
kv._1 -> Array[Byte] row key
kv._2 -> map( String key, map ( Long, String ) )
kv._2 -> map( column family name, map ( column qualifier name, value ) )
*/
(Bytes.toString(kv._1), kv._2)
})
import spark.implicits._
//mapredHBase.toDF().show(100,false)
log.info(mapredHBase.toString())
// mapredHBase.groupBy(_._1).groupBy(_._1).map(_._2).sum().show()
for{ m <- mapredHBase }{
log.info(s"${m._1} ${m._2}")
/*
2017-12-18 10:18:04 INFO com.bigindex.batch.hbase.StatsCount$:101 - 1488322800000:DK_LEI_6000 Map(a -> Map(CALYPSO -> 1, MUREX -> 2, TRS -> 3), l -> Map(CONTRACT -> 4, RECONCILIATION -> 2), p -> Map(BOND -> 6))
2017-12-18 10:18:04 INFO com.bigindex.batch.hbase.StatsCount$:116 - timestamp 148832280000 dimension :DK_LEI_6000
2017-12-18 10:18:04 INFO com.bigindex.batch.hbase.StatsCount$:101 - 1488322800000:DK_LEI_6100 Map(a -> Map(CALYPSO -> 1, MUREX -> 2, TRS -> 2), l -> Map(CONTRACT -> 3, RECONCILIATION -> 2), p -> Map(REPO -> 5))
2017-12-18 10:18:04 INFO com.bigindex.batch.hbase.StatsCount$:116 - timestamp 148832280000 dimension :DK_LEI_6100
2017-12-18 10:18:04 INFO com.bigindex.batch.hbase.StatsCount$:101 - 1488322800000:DK_LEI_6200 Map(a -> Map(CALYPSO -> 2, MUREX -> 4, TRS -> 4), l -> Map(CONTRACT -> 6, RECONCILIATION -> 4), p -> Map(BOND -> 5, REPO -> 5))
2017-12-18 10:18:04 INFO com.bigindex.batch.hbase.StatsCount$:116 - timestamp 148832280000 dimension :DK_LEI_6200
2017-12-18 10:18:04 INFO com.bigindex.batch.hbase.StatsCount$:101 - 1488322800000:DK_LEI_6300 Map(a -> Map(CALYPSO -> 1, MUREX -> 2, TRS -> 2), l -> Map(CONTRACT -> 3, RECONCILIATION -> 2), p -> Map(BOND -> 5))
2017-12-18 10:18:04 INFO com.bigindex.batch.hbase.StatsCount$:116 - timestamp 148832280000 dimension :DK_LEI_6300
2017-12-18 10:18:04 INFO com.bigindex.batch.hbase.StatsCount$:101 - 1488322800000:NO_LEI_2000 Map(a -> Map(CALYPSO -> 1, MUREX -> 2, TRS -> 2), l -> Map(CONTRACT -> 3, RECONCILIATION -> 2), p -> Map(REPO -> 5))
*/
val key:String = m._1
val timestamp:String = key.substring(0,12)
val dimension = key.substring(13)
log.info(s"timestamp ${timestamp} dimension ${dimension}")
}
???
}
private[hbase] def createHBaseScan(start: Instant, end: Instant, conf: Configuration) = {
if (start != null) conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_ROW_START, String.valueOf(start.toEpochMilli))
if (end != null) conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_ROW_STOP, String.valueOf(end.toEpochMilli))
conf
}
def fromResult(result: Result): StatsCount = {
StatsCount(1,1,1,"dyh","asf")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment