Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save dgadiraju/ba9c9b3c59cddb743194f3da38aee370 to your computer and use it in GitHub Desktop.
Save dgadiraju/ba9c9b3c59cddb743194f3da38aee370 to your computer and use it in GitHub Desktop.
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}
/**
* Created by itversity on 22/08/18.
*/
object GetStreamingDepartmentTrafficHBase {
def main(args: Array[String]): Unit = {
val conf = ConfigFactory.load.getConfig(args(0))
val spark = SparkSession.
builder.
appName("Get Streaming Department Traffic").
master(conf.getString("execution.mode")).
getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark.sql.shuffle.partitions", "2")
import spark.implicits._
val lines = spark.
readStream.
format("kafka").
option("kafka.bootstrap.servers", conf.getString("bootstrap.servers")).
option("subscribe", "retail").
option("includeTimestamp", true).
load().
selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp").
as[(String, String, Timestamp)]
val departmentTraffic = lines.
where(split(split($"value", " ")(6), "/")(1) === "department").
withColumn("department_name", split(split($"value", " ")(6), "/")(2)).
groupBy(
window($"timestamp", "20 seconds", "20 seconds"),$"department_name"
).agg(count("value").alias("department_count"))
val writer = new ForeachWriter[Row] {
var hBaseConf: Configuration = _
var connection: Connection = _
override def open(partitionId: Long, version: Long) = {
hBaseConf = HBaseConfiguration.create()
hBaseConf.set("hbase.zookeeper.quorum", conf.getString("zookeeper.quorum"))
hBaseConf.set("hbase.zookeeper.property.clientPort", conf.getString("zookeeper.port"))
hBaseConf.set("zookeeper.znode.parent","/hbase-unsecure")
hBaseConf.set("hbase.cluster.distributed","true")
connection = ConnectionFactory.createConnection(hBaseConf)
true
}
override def process(value: Row) = {
val table = connection.getTable(TableName.valueOf("department_count"))
val currDate = (new SimpleDateFormat("yyyy-MM-dd")).format(new Date())
val currTime = (new SimpleDateFormat("yyyy-MM-dd HH:mm")).format(new Date())
val rowKey = Bytes.toBytes(currDate + ":" + value.get(1).toString)
val row = new Put(rowKey)
val get = new Get(rowKey)
val rs = table.get(get)
if(rs.getValue(Bytes.toBytes("cf"), Bytes.toBytes(currTime)) != null) {
val count = Bytes.toString(rs.getValue(Bytes.toBytes("cf"), Bytes.toBytes(currTime))).toLong
row.addColumn(Bytes.toBytes("cf"),
Bytes.toBytes(currTime),
Bytes.toBytes((value.getLong(2) + count).toString))
} else {
row.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(currTime), Bytes.toBytes(value.get(2).toString))
}
table.put(row)
}
override def close(errorOrNull: Throwable) = {
connection.close()
}
}
val query = departmentTraffic.writeStream.
foreach(writer).
outputMode("update").
trigger(Trigger.ProcessingTime("20 seconds")).
start()
query.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment