Smart Thermostat AWS Spark - part Spark cluster retrieve temparature scenarios
//spark settings | |
val sparkconf = new SparkConf().setAppName("tempcluster") | |
val sc = new SparkContext(sparkconf) | |
//scan hbase used scenario table to collect used scenarios that have been scored | |
val confonoff = HBaseConfiguration.create() | |
val scanonoff = new Scan() | |
scanonoff.addColumn("fd".getBytes(), "outtempdif".getBytes()) | |
scanonoff.addColumn("fd".getBytes(), "tempdif".getBytes()) | |
scanonoff.addColumn("fd".getBytes(), "score".getBytes()) | |
scanonoff.addColumn("fd".getBytes(), "scenariokey".getBytes()) | |
scanonoff.setStopRow("40b5af01_9999999999_40b5af01_9999999999_9999999".getBytes()) | |
confonoff.set(TableInputFormat.INPUT_TABLE, "husedscenario") | |
confonoff.set(TableInputFormat.SCAN, convertScanToString(scanonoff)) | |
//create RDD with used scenarios | |
val RDDonOff = sc.newAPIHadoopRDD(confonoff, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result]) | |
val valRowsOnOff = RDDonOff.map(tuple => tuple._2 | |
).filter(result => Bytes.toString(result.getValue("fd".getBytes(), "score".getBytes())) != null //only select scored scenarios | |
).map(result => ( Bytes.toString(result.getValue("fd".getBytes(), "scenariokey".getBytes())), ( Bytes.toString(result.getValue("fd".getBytes(), "outtempdif".getBytes())).toInt , Bytes.toString(result.getValue("fd".getBytes(), "tempdif".getBytes())).toInt , Bytes.toString(result.getValue("fd".getBytes(), "score".getBytes())).toLong ) ) | |
) | |
valRowsOnOff.cache() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment