Skip to content

Instantly share code, notes, and snippets.

@akhld
Created June 4, 2015 10:35
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akhld/a4e46bdc832218486acb to your computer and use it in GitHub Desktop.
Save akhld/a4e46bdc832218486acb to your computer and use it in GitHub Desktop.
Spark Streaming with HBase
val push_hbase = aggregatedStream.transform(rdd => {
val hbaseTableName = "global_aggregate"
val hbaseColumnName = "aggregate"
//Creates the HBase confs
val hconf = HBaseConfiguration.create()
hconf.set("hbase.zookeeper.quorum", "sigmoid-machine1,sigmoid-machine2,sigmoid-machine3,sigmoid-machine4")
hconf.set("hbase.zookeeper.property.clientPort", "2181")
hconf.set("hbase.defaults.for.version.skip", "true")
hconf.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName)
hconf.setClass("mapreduce.job.outputformat.class", classOf[TableOutputFormat[String]], classOf[OutputFormat[String, Mutation]])
val admin = new HBaseAdmin(hconf)
//If table already exists, then lets read back and update it
if (admin.isTableAvailable(hbaseTableName)) {
hconf.set(TableInputFormat.INPUT_TABLE, hbaseTableName)
hconf.set(TableInputFormat.SCAN_COLUMNS, "CF:" + hbaseColumnName + " CF:batch")
val check = ssc.sparkContext.newAPIHadoopRDD(hconf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).map {
case (key, row) => {
val rowkey = Bytes.toString(key.get())
val valu = Bytes.toString(row.getValue(Bytes.toBytes("CF"), Bytes.toBytes(hbaseColumnName)))
(rowkey, valu.toInt)
}
}
//Lets union the stream data with previous hbase data.
val jdata = (check ++ rdd).reduceByKey(_ + _)
jdata.map(valu => (new ImmutableBytesWritable, {
val record = new Put(Bytes.toBytes(valu._1))
record.add(Bytes.toBytes("CF"), Bytes.toBytes(hbaseColumnName), Bytes.toBytes(valu._2.toString))
record
}
)
).saveAsNewAPIHadoopDataset(hconf)
jdata
//Let’s create the table and push the data into it.
}else{
val tab = TableName.valueOf(hbaseTableName)
val tabledesc = new HTableDescriptor(tab)
tabledesc.addFamily(new HColumnDescriptor("CF"))
admin.createTable(tabledesc)
rdd.map(valu => (new ImmutableBytesWritable, {
val record = new Put(Bytes.toBytes(valu._1))
record.add(Bytes.toBytes("CF"), Bytes.toBytes(hbaseColumnName), Bytes.toBytes(valu._2.toString))
record
}
)
).saveAsNewAPIHadoopDataset(hconf)
rdd
}
})
push_hbase.count().print()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment