Skip to content

Instantly share code, notes, and snippets.

@rcketscientist
Last active October 31, 2017 18:32
Show Gist options
  • Save rcketscientist/51fa8045e8507bd52895f5f42ebff37d to your computer and use it in GitHub Desktop.
Save rcketscientist/51fa8045e8507bd52895f5f42ebff37d to your computer and use it in GitHub Desktop.
package com.keysight.hbase
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object AverageSensorOverTime1 /*extends App*/ {
def main(args:Array[String]) {
// TODO: This shares a lot of code with https://bitbucket.it.keysight.com/projects/ATL/repos/data-management/browse/pipeline/core/DistributedQueryEngine/AssetAnalytics/src/main/scala/com/keysight/spark/sql/analytics/pipeline/KuduTimeSeries.scala
val sparkConf = new SparkConf().setAppName("AverageSensorOverTime")
val sparkContext = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sparkContext, conf)
val sqlContext = new SQLContext(sparkContext)
val sensorDataName = "sensorData"
val sensorName = "sensor"
val sensorDataTempTable = "sensor_data" // No camelcase?
val sensorTempTable = "sensor"
val startTime = "2017-12-24 23:50:00"
val endTime = "2017-12-25 00:20:00"
def withCatalog(cat: String, table: String): DataFrame = {
sqlContext.load("org.apache.hadoop.hbase.spark",
Map("hbase.columns.mapping" -> cat,
"hbase.table" -> table ))
}
val catSensorData =
"KEY_FIELD STRING :key," +
"companyid STRING cf:companyId," +
"siteid STRING cf:siteId," +
"assetid STRING cf:equipmentId," +
"moduleid STRING cf:moduleId," +
"cardid STRING cf:cardId," +
"partid STRING cf:partId," +
"sensortype STRING cf:sensorType," +
"measuretime BIGINT cf:timestamp," +
"value STRING cf:value"
val sensorData = withCatalog(catSensorData, sensorDataName)
val catSensor =
"KEY_FIELD STRING :key," +
"device STRING cf:device," +
"siteid STRING cf:siteId," +
"channelnumber STRING cf:channelNumber," +
"companyid STRING cf:companyId," +
"siteid STRING cf:siteId," +
"assetid STRING cf:equipmentId," +
"moduleid STRING cf:moduleId," +
"cardId STRING cf:cardId," +
"partid STRING cf:partId," +
"sensortypeid BIGINT cf:sensorTypeId," +
"sensortype STRING cf:sensorType," +
"lowerthreshold FLOAT cf:lowerThreshold," +
"upperthreshold FLOAT cf:upperThreshold"
val sensor = withCatalog(catSensor, sensorName)
// Register Kudu table as a Spark SQL temp table
println("Generating Spark SQL temp tables.")
Util.time_in_ms {
sensor.registerTempTable(sensorTempTable)
sensorData.registerTempTable(sensorDataTempTable)
}
println("Temp tables registered.")
val queries = Seq(
( "Query 1: Sensor readings within a narrow time range",
s"""SELECT *, cast(value as float)
FROM $sensorDataTempTable
WHERE measuretime >= "$startTime"
AND measuretime < "$endTime" """, 10 ),
( "Query 2: Aggregates of sensor readings in that same time range",
s"""SELECT count(*), avg(cast(value as float))
FROM $sensorDataTempTable
WHERE measuretime >= "$startTime"
AND measuretime < "$endTime" """, 0 ),
( "Query 3: Identify average sensor readings for a specific device (no join yet)",
s"""select sensortype, moduleid, count(sensortype), avg(cast(value as float))
from $sensorDataTempTable
where assetid = "IPCICT29.CORP.JABIL.ORG"
group by sensortype, moduleid
order by sensortype, moduleid """, 0 ),
( "Query 3: Get Average sensor readings for a specific device (no join yet)",
s"""select sensortype, moduleid, count(sensortype), avg(cast(value as float))
from $sensorDataTempTable
where assetid = "IPCICT29.CORP.JABIL.ORG"
group by sensortype, moduleid
order by sensortype, moduleid """, 0 ),
( "Query 4: Get Average sensor readings for devices on higher channel numbers, as identified by a join with sensor table",
s"""select sd.assetid, s.channelnumber, sd.sensortype, sd.moduleid, count(sd.sensortype), avg(cast(value as float))
from $sensorDataTempTable sd
inner join $sensorTempTable s on s.companyid = sd.companyid and s.siteid = sd.siteid
and s.assetid = sd.assetid and s.moduleid = sd.moduleid
and s.partid = sd.partid and s.cardid = sd.cardid and s.sensortype = sd.sensortype
where cast(s.channelnumber as bigint) > 249
group by sd.assetid, s.channelnumber, sd.sensortype, sd.moduleid
order by sd.assetid, cast(s.channelnumber as bigint), sd.sensortype, sd.moduleid """, 0),
// ( "Query 5i: Aggregate of 3.3V sensor readings (channel 254, joined) which are out of range -- no time element yet",
// s"""select sd.assetid, sd.sensortype, count(value), avg(cast(value as float))
// from $sensorDataTempTable sd
// inner join $sensorTempTable s
// on s.companyid = sd.companyid
// and s.siteid = sd.siteid
// and s.assetid = sd.assetid
// and s.moduleid = sd.moduleid
// and s.partid = sd.partid
// and s.cardid = sd.cardid
// and s.sensortype = sd.sensortype
// where cast(s.channelnumber as bigint) = 254
// group by sd.assetid, sd.sensortype
// having avg(cast(value as float)) > 3.300001 or avg(cast(value as float)) < 3.299999
// order by sd.assetid, sd.sensortype """, 0),
// ( "Query 5ii: Raw 3.3V sensor readings out-of-bounds for the 3.3V sensor (channel 254, joined)",
// s"""select sd.assetid, sd.sensortype, sd.measuretime, cast(value as float)
// from $sensorDataTempTable sd
// inner join $sensorTempTable s
// on s.companyid = sd.companyid
// and s.siteid = sd.siteid
// and s.assetid = sd.assetid
// and s.moduleid = sd.moduleid
// and s.partid = sd.partid
// and s.cardid = sd.cardid
// and s.sensortype = sd.sensortype
// where cast(s.channelnumber as bigint) = 254
// and ( cast(value as float) > 3.3001 or cast(value as float) < 3.2999 )
// order by sd.assetid, sd.sensortype, sd.measuretime """, 25),
// ( "Query 5iii: Raw 3.3V sensor readings out-of-bounds for the 3.3V sensor (channel 254, joined)",
// s"""select sd.assetid, sd.sensortype, sd.measuretime, month(sd.measuretime), cast(value as float)
// from $sensorDataTempTable sd
// inner join $sensorTempTable s
// on s.companyid = sd.companyid
// and s.siteid = sd.siteid
// and s.assetid = sd.assetid
// and s.moduleid = sd.moduleid
// and s.partid = sd.partid
// and s.cardid = sd.cardid
// and s.sensortype = sd.sensortype
// where cast(s.channelnumber as bigint) = 254
// and ( cast(value as float) > 3.3001 or cast(value as float) < 3.2999 )
// order by sd.assetid, sd.sensortype, sd.measuretime """, 25),
( "Query 5: Determine months where sensor readings are out-of-bounds for the 3.3V sensor (channel 254, joined)",
s"""select sd.assetid, sd.sensortype, month(sd.measuretime) as formonth, count(sd.sensortype), avg(cast(value as float))
from $sensorDataTempTable sd
inner join $sensorTempTable s
on s.companyid = sd.companyid
and s.siteid = sd.siteid
and s.assetid = sd.assetid
and s.moduleid = sd.moduleid
and s.partid = sd.partid
and s.cardid = sd.cardid
and s.sensortype = sd.sensortype
where cast(s.channelnumber as bigint) = 254
group by sd.assetid, sd.sensortype, month(sd.measuretime)
having avg(cast(value as float)) > 3.3002 or avg(cast(value as float)) < 3.2998
order by sd.assetid, sd.sensortype, formonth """, 0),
( "Query 6: Determine months where sensor readings are out-of-bounds for the 5-Volt sensor (channel 255, joined)",
s"""select sd.assetid, sd.sensortype, month(sd.measuretime) as formonth, count(sd.sensortype), avg(cast(value as float))
from $sensorDataTempTable sd
inner join $sensorTempTable s
on s.companyid = sd.companyid
and s.siteid = sd.siteid
and s.assetid = sd.assetid
and s.moduleid = sd.moduleid
and s.partid = sd.partid
and s.cardid = sd.cardid
and s.sensortype = sd.sensortype
where cast(s.channelnumber as bigint) = 255
group by sd.assetid, sd.sensortype, month(measuretime)
having avg(cast(value as float)) > 5.0002 or avg(cast(value as float)) < 4.9998
order by sd.assetid, sd.sensortype, _c3 """, 0)
)
queries map {
case (desc:String, query:String, maxShow:Int) => {
println("Running query: " + desc)
println(query)
var rows = Util.time_in_ms {
val queryRecordsDF = sqlContext.sql(query)
queryRecordsDF.collect()
}
if ( maxShow > 0 ) {
println("Showing up to " + maxShow + " results.")
rows = rows.take(maxShow)
}
rows.map(println(_))
}
}
sparkContext.stop()
//
// sqlContext.sql(
// "SELECT sd.assetId, sd.sensorType, MONTH(from_unixtime(sd.measureTime)) AS measureMonth, COUNT(sd.sensorType), AVG(CAST(value as float)) AS average " +
// "FROM sensor_data sd " +
// "INNER JOIN sensor s ON " +
// "s.companyId = sd.companyId " +
// "AND s.siteId = sd.siteId " +
// "AND s.assetId = sd.assetId " +
// "AND s.moduleId = sd.moduleId " +
// "AND s.partId = sd.partId " +
// "AND s.cardId = sd.cardId " + "AND s.sensorType = sd.sensorType " +
// "WHERE CAST(s.channelNumber as bigint) > 249 " +
// "GROUP BY sd.assetId, sd.sensorType, MONTH(from_unixtime(sd.measureTime)) " +
// "HAVING average > 3.301 or average < 3.299 " +
// "ORDER BY sd.assetId, sd.sensorType, measureMonth")
// .collect().foreach(println)
// .map(x=> x.mkString("|")).saveAsTextFile("/temp/results/" + sparkContext.applicationId)
//.write.save("/temp/results/" + sparkContext.applicationId + ".out")//show()//.foreach(r => println(" - " + r))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment