Last active
October 31, 2017 18:32
-
-
Save rcketscientist/51fa8045e8507bd52895f5f42ebff37d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.keysight.hbase | |
import org.apache.hadoop.hbase.HBaseConfiguration | |
import org.apache.hadoop.hbase.spark.HBaseContext | |
import org.apache.spark.sql.{DataFrame, SQLContext} | |
import org.apache.spark.{SparkConf, SparkContext} | |
object AverageSensorOverTime1 /*extends App*/ { | |
def main(args:Array[String]) { | |
// TODO: This shares a lot of code with https://bitbucket.it.keysight.com/projects/ATL/repos/data-management/browse/pipeline/core/DistributedQueryEngine/AssetAnalytics/src/main/scala/com/keysight/spark/sql/analytics/pipeline/KuduTimeSeries.scala | |
val sparkConf = new SparkConf().setAppName("AverageSensorOverTime") | |
val sparkContext = new SparkContext(sparkConf) | |
val conf = HBaseConfiguration.create() | |
val hbaseContext = new HBaseContext(sparkContext, conf) | |
val sqlContext = new SQLContext(sparkContext) | |
val sensorDataName = "sensorData" | |
val sensorName = "sensor" | |
val sensorDataTempTable = "sensor_data" // No camelcase? | |
val sensorTempTable = "sensor" | |
val startTime = "2017-12-24 23:50:00" | |
val endTime = "2017-12-25 00:20:00" | |
def withCatalog(cat: String, table: String): DataFrame = { | |
sqlContext.load("org.apache.hadoop.hbase.spark", | |
Map("hbase.columns.mapping" -> cat, | |
"hbase.table" -> table )) | |
} | |
val catSensorData = | |
"KEY_FIELD STRING :key," + | |
"companyid STRING cf:companyId," + | |
"siteid STRING cf:siteId," + | |
"assetid STRING cf:equipmentId," + | |
"moduleid STRING cf:moduleId," + | |
"cardid STRING cf:cardId," + | |
"partid STRING cf:partId," + | |
"sensortype STRING cf:sensorType," + | |
"measuretime BIGINT cf:timestamp," + | |
"value STRING cf:value" | |
val sensorData = withCatalog(catSensorData, sensorDataName) | |
val catSensor = | |
"KEY_FIELD STRING :key," + | |
"device STRING cf:device," + | |
"siteid STRING cf:siteId," + | |
"channelnumber STRING cf:channelNumber," + | |
"companyid STRING cf:companyId," + | |
"siteid STRING cf:siteId," + | |
"assetid STRING cf:equipmentId," + | |
"moduleid STRING cf:moduleId," + | |
"cardId STRING cf:cardId," + | |
"partid STRING cf:partId," + | |
"sensortypeid BIGINT cf:sensorTypeId," + | |
"sensortype STRING cf:sensorType," + | |
"lowerthreshold FLOAT cf:lowerThreshold," + | |
"upperthreshold FLOAT cf:upperThreshold" | |
val sensor = withCatalog(catSensor, sensorName) | |
// Register Kudu table as a Spark SQL temp table | |
println("Generating Spark SQL temp tables.") | |
Util.time_in_ms { | |
sensor.registerTempTable(sensorTempTable) | |
sensorData.registerTempTable(sensorDataTempTable) | |
} | |
println("Temp tables registered.") | |
val queries = Seq( | |
( "Query 1: Sensor readings within a narrow time range", | |
s"""SELECT *, cast(value as float) | |
FROM $sensorDataTempTable | |
WHERE measuretime >= "$startTime" | |
AND measuretime < "$endTime" """, 10 ), | |
( "Query 2: Aggregates of sensor readings in that same time range", | |
s"""SELECT count(*), avg(cast(value as float)) | |
FROM $sensorDataTempTable | |
WHERE measuretime >= "$startTime" | |
AND measuretime < "$endTime" """, 0 ), | |
( "Query 3: Identify average sensor readings for a specific device (no join yet)", | |
s"""select sensortype, moduleid, count(sensortype), avg(cast(value as float)) | |
from $sensorDataTempTable | |
where assetid = "IPCICT29.CORP.JABIL.ORG" | |
group by sensortype, moduleid | |
order by sensortype, moduleid """, 0 ), | |
( "Query 3: Get Average sensor readings for a specific device (no join yet)", | |
s"""select sensortype, moduleid, count(sensortype), avg(cast(value as float)) | |
from $sensorDataTempTable | |
where assetid = "IPCICT29.CORP.JABIL.ORG" | |
group by sensortype, moduleid | |
order by sensortype, moduleid """, 0 ), | |
( "Query 4: Get Average sensor readings for devices on higher channel numbers, as identified by a join with sensor table", | |
s"""select sd.assetid, s.channelnumber, sd.sensortype, sd.moduleid, count(sd.sensortype), avg(cast(value as float)) | |
from $sensorDataTempTable sd | |
inner join $sensorTempTable s on s.companyid = sd.companyid and s.siteid = sd.siteid | |
and s.assetid = sd.assetid and s.moduleid = sd.moduleid | |
and s.partid = sd.partid and s.cardid = sd.cardid and s.sensortype = sd.sensortype | |
where cast(s.channelnumber as bigint) > 249 | |
group by sd.assetid, s.channelnumber, sd.sensortype, sd.moduleid | |
order by sd.assetid, cast(s.channelnumber as bigint), sd.sensortype, sd.moduleid """, 0), | |
// ( "Query 5i: Aggregate of 3.3V sensor readings (channel 254, joined) which are out of range -- no time element yet", | |
// s"""select sd.assetid, sd.sensortype, count(value), avg(cast(value as float)) | |
// from $sensorDataTempTable sd | |
// inner join $sensorTempTable s | |
// on s.companyid = sd.companyid | |
// and s.siteid = sd.siteid | |
// and s.assetid = sd.assetid | |
// and s.moduleid = sd.moduleid | |
// and s.partid = sd.partid | |
// and s.cardid = sd.cardid | |
// and s.sensortype = sd.sensortype | |
// where cast(s.channelnumber as bigint) = 254 | |
// group by sd.assetid, sd.sensortype | |
// having avg(cast(value as float)) > 3.300001 or avg(cast(value as float)) < 3.299999 | |
// order by sd.assetid, sd.sensortype """, 0), | |
// ( "Query 5ii: Raw 3.3V sensor readings out-of-bounds for the 3.3V sensor (channel 254, joined)", | |
// s"""select sd.assetid, sd.sensortype, sd.measuretime, cast(value as float) | |
// from $sensorDataTempTable sd | |
// inner join $sensorTempTable s | |
// on s.companyid = sd.companyid | |
// and s.siteid = sd.siteid | |
// and s.assetid = sd.assetid | |
// and s.moduleid = sd.moduleid | |
// and s.partid = sd.partid | |
// and s.cardid = sd.cardid | |
// and s.sensortype = sd.sensortype | |
// where cast(s.channelnumber as bigint) = 254 | |
// and ( cast(value as float) > 3.3001 or cast(value as float) < 3.2999 ) | |
// order by sd.assetid, sd.sensortype, sd.measuretime """, 25), | |
// ( "Query 5iii: Raw 3.3V sensor readings out-of-bounds for the 3.3V sensor (channel 254, joined)", | |
// s"""select sd.assetid, sd.sensortype, sd.measuretime, month(sd.measuretime), cast(value as float) | |
// from $sensorDataTempTable sd | |
// inner join $sensorTempTable s | |
// on s.companyid = sd.companyid | |
// and s.siteid = sd.siteid | |
// and s.assetid = sd.assetid | |
// and s.moduleid = sd.moduleid | |
// and s.partid = sd.partid | |
// and s.cardid = sd.cardid | |
// and s.sensortype = sd.sensortype | |
// where cast(s.channelnumber as bigint) = 254 | |
// and ( cast(value as float) > 3.3001 or cast(value as float) < 3.2999 ) | |
// order by sd.assetid, sd.sensortype, sd.measuretime """, 25), | |
( "Query 5: Determine months where sensor readings are out-of-bounds for the 3.3V sensor (channel 254, joined)", | |
s"""select sd.assetid, sd.sensortype, month(sd.measuretime) as formonth, count(sd.sensortype), avg(cast(value as float)) | |
from $sensorDataTempTable sd | |
inner join $sensorTempTable s | |
on s.companyid = sd.companyid | |
and s.siteid = sd.siteid | |
and s.assetid = sd.assetid | |
and s.moduleid = sd.moduleid | |
and s.partid = sd.partid | |
and s.cardid = sd.cardid | |
and s.sensortype = sd.sensortype | |
where cast(s.channelnumber as bigint) = 254 | |
group by sd.assetid, sd.sensortype, month(sd.measuretime) | |
having avg(cast(value as float)) > 3.3002 or avg(cast(value as float)) < 3.2998 | |
order by sd.assetid, sd.sensortype, formonth """, 0), | |
( "Query 6: Determine months where sensor readings are out-of-bounds for the 5-Volt sensor (channel 255, joined)", | |
s"""select sd.assetid, sd.sensortype, month(sd.measuretime) as formonth, count(sd.sensortype), avg(cast(value as float)) | |
from $sensorDataTempTable sd | |
inner join $sensorTempTable s | |
on s.companyid = sd.companyid | |
and s.siteid = sd.siteid | |
and s.assetid = sd.assetid | |
and s.moduleid = sd.moduleid | |
and s.partid = sd.partid | |
and s.cardid = sd.cardid | |
and s.sensortype = sd.sensortype | |
where cast(s.channelnumber as bigint) = 255 | |
group by sd.assetid, sd.sensortype, month(measuretime) | |
having avg(cast(value as float)) > 5.0002 or avg(cast(value as float)) < 4.9998 | |
order by sd.assetid, sd.sensortype, _c3 """, 0) | |
) | |
queries map { | |
case (desc:String, query:String, maxShow:Int) => { | |
println("Running query: " + desc) | |
println(query) | |
var rows = Util.time_in_ms { | |
val queryRecordsDF = sqlContext.sql(query) | |
queryRecordsDF.collect() | |
} | |
if ( maxShow > 0 ) { | |
println("Showing up to " + maxShow + " results.") | |
rows = rows.take(maxShow) | |
} | |
rows.map(println(_)) | |
} | |
} | |
sparkContext.stop() | |
// | |
// sqlContext.sql( | |
// "SELECT sd.assetId, sd.sensorType, MONTH(from_unixtime(sd.measureTime)) AS measureMonth, COUNT(sd.sensorType), AVG(CAST(value as float)) AS average " + | |
// "FROM sensor_data sd " + | |
// "INNER JOIN sensor s ON " + | |
// "s.companyId = sd.companyId " + | |
// "AND s.siteId = sd.siteId " + | |
// "AND s.assetId = sd.assetId " + | |
// "AND s.moduleId = sd.moduleId " + | |
// "AND s.partId = sd.partId " + | |
// "AND s.cardId = sd.cardId " + "AND s.sensorType = sd.sensorType " + | |
// "WHERE CAST(s.channelNumber as bigint) > 249 " + | |
// "GROUP BY sd.assetId, sd.sensorType, MONTH(from_unixtime(sd.measureTime)) " + | |
// "HAVING average > 3.301 or average < 3.299 " + | |
// "ORDER BY sd.assetId, sd.sensorType, measureMonth") | |
// .collect().foreach(println) | |
// .map(x=> x.mkString("|")).saveAsTextFile("/temp/results/" + sparkContext.applicationId) | |
//.write.save("/temp/results/" + sparkContext.applicationId + ".out")//show()//.foreach(r => println(" - " + r)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment