Skip to content

Instantly share code, notes, and snippets.

@ishassan
Created April 4, 2016 13:19
Show Gist options
  • Save ishassan/94110d43faf0f964ac98ba426b8fa83c to your computer and use it in GitHub Desktop.
Save ishassan/94110d43faf0f964ac98ba426b8fa83c to your computer and use it in GitHub Desktop.
resolvers ++= Seq(
"Hadoop Releases" at "https://repository.cloudera.com/content/repositories/releases/"
)
libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "15.0",
"org.apache.hadoop" % "hadoop-common" % "2.6.0",
"org.apache.hbase" % "hbase-common" % "1.0.0" ,
"org.apache.hbase" % "hbase-client" % "1.0.0",
"org.apache.hbase" % "hbase-protocol" % "1.0.0",
"com.cloudera" % "spark-hbase" % "0.0.2-clabs" excludeAll(
ExclusionRule("org.mortbay.jetty")
)
)
dependencyOverrides += "com.google.guava" % "guava" % "15.0"
assemblyMergeStrategy in assembly := {
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
case PathList("javax", "activation", xs @ _*) => MergeStrategy.last
case PathList("org", "apache", xs @ _*) => MergeStrategy.last
case PathList("com", "google", xs @ _*) => MergeStrategy.last
case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
case PathList("com", "yammer", xs @ _*) => MergeStrategy.last
case "about.html" => MergeStrategy.rename
case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
case "META-INF/mailcap" => MergeStrategy.last
case "META-INF/mimetypes.default" => MergeStrategy.last
case "plugin.properties" => MergeStrategy.last
case "log4j.properties" => MergeStrategy.last
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
/*
For more examples, please refer to https://github.com/cloudera-labs/SparkOnHBase/tree/cdh5-0.0.2/java/com/cloudera/spark/hbase/example
*/
package testpackage
import com.cloudera.spark.hbase.HBaseContext
import com.typesafe.config.Config
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.{Get, Result, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import spark.jobserver._
case class Column(var family : Array[Byte] , var qualifier : Array[Byte] , var value : Array[Byte])
case class Record(var rowKey : Array[Byte], colList : Array[Column])
object BulkPutAndGetExample extends SparkJob {
override def validate(sc: SparkContext, config: Config): SparkJobValidation = {
SparkJobValid
}
override def runJob(sc: SparkContext, config: Config): Any = {
val conf : Configuration = HBaseConfiguration.create()
val ZOOKEEPER_QUORUM = "WRITE YOUR ZOOKEEPER_QUORUM"
conf.set("hbase.zookeeper.quorum", ZOOKEEPER_QUORUM);
val hbaseContext = new HBaseContext(sc, conf)
// Put Example
val putList = Array(
Record(Bytes.toBytes("6"), Array(
Column(Bytes.toBytes("d"), Bytes.toBytes("name"), Bytes.toBytes("name6")),
Column(Bytes.toBytes("d"), Bytes.toBytes("phone"), Bytes.toBytes("phone6"))
)),
Record(Bytes.toBytes("7"), Array(
Column(Bytes.toBytes("d"), Bytes.toBytes("name"), Bytes.toBytes("name7")),
Column(Bytes.toBytes("d"), Bytes.toBytes("phone"), Bytes.toBytes("phone7"))
)),
Record(Bytes.toBytes("8"), Array(
Column(Bytes.toBytes("d"), Bytes.toBytes("name"), Bytes.toBytes("name8")),
Column(Bytes.toBytes("d"), Bytes.toBytes("phone"), Bytes.toBytes("phone8"))
))
)
val putRdd = sc.parallelize(putList)
hbaseContext.bulkPut(putRdd, "ishassan:test_table", putFunction, true)
// Get Example
val getList = Array(
Bytes.toBytes("6"),
Bytes.toBytes("7"),
Bytes.toBytes("8")
)
val getRdd = sc.parallelize(getList)
val recordRdd = hbaseContext.bulkGet("ishassan:test_table", 1, getRdd, makeGet, result => {
val cells = result.rawCells()
val rowKey = result.getRow()
val colList : Array[Column] = new Array[Column](cells.length)
for(i <- 0 until cells.length){
val cell = cells(i)
val colFamily = CellUtil.cloneFamily(cell)
val colQualifier = CellUtil.cloneQualifier(cell)
val colValue = CellUtil.cloneValue(cell)
colList(i) = Column(colFamily, colQualifier, colValue)
}
Record(rowKey, colList)
})
recordRdd.collect().foreach(record => {
print( Bytes.toString(record.rowKey) + ": ")
val colList = record.colList
colList.foreach(col => {
print(Bytes.toString(col.family) + " ")
print(Bytes.toString(col.qualifier) + " ")
print(Bytes.toString(col.value) + " ")
println()
})
})
println("Done")
1
}
def putFunction (record : Record) : Put = {
val put = new Put( record.rowKey )
record.colList.foreach(col => {
put.addColumn(col.family, col.qualifier, col.value)
})
put
}
def makeGet(record : Array[Byte]) : Get = {
new Get(record)
}
}
name := "SparkHBase"
version := "1.0"
scalaVersion := "2.10.4"
exportJars := true
libraryDependencies += "spark.jobserver" %% "job-server" % "0.6.1" % "provided"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment