Created
July 31, 2015 10:09
-
-
Save niektemme/0295b0c5a60d839ccbbb to your computer and use it in GitHub Desktop.
Smart Thermostat AWS Spark - part Spark cluster final
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//alternative scnearios get highest score | |
val maxscore : Int = ((RDDtotalClustered.map(result=> (result._2._1,1)).sortByKey(false).take(1))(0))._1 + 10 | |
//determine cluster centers (centers are new tempdif and outtempdif) | |
val RDDcenters = sc.parallelize(clusters.clusterCenters) | |
val RDDcentersCalc = RDDcenters.map(result => (clusters.predict(result) , result ) ) | |
RDDcentersCalc.cache() | |
//join best scenarios for temperature groups with cluster centers | |
val RDDClusterWithCenters = RDDtotalClustered.join(RDDcentersCalc | |
).map(result => (result._2._1._2, (result._1, result._2._1._1, math.round(result._2._2(0)), math.round(result._2._2(1)) ) )) | |
RDDClusterWithCenters.cache() | |
//scan scenario hbase table te retreive scenario information | |
val bvepoch: Long = 9999999999L | |
val vepoch: Long = System.currentTimeMillis / 1000 | |
val startmepoch : Long = bvepoch - vepoch | |
val vstartrow: String = "40b5af01_"+ startmepoch.toString | |
val confscenvals = HBaseConfiguration.create() | |
val scanscenvals = new Scan() | |
confscenvals.set(TableInputFormat.INPUT_TABLE, "hactscenario") | |
confscenvals.set(TableInputFormat.SCAN, convertScanToString(scanscenvals)) | |
val RDDconfscenvals = sc.newAPIHadoopRDD(confscenvals, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result]) | |
val RDDscenvals = RDDconfscenvals.map(tuple => tuple._2 | |
).map(result => ( Bytes.toString(result.getRow()) , | |
( | |
(Bytes.toString(result.getValue("fd".getBytes(), "run0".getBytes())) , | |
Bytes.toString(result.getValue("fd".getBytes(), "run1".getBytes())), | |
Bytes.toString(result.getValue("fd".getBytes(), "run2".getBytes())), | |
Bytes.toString(result.getValue("fd".getBytes(), "run3".getBytes())) , | |
Bytes.toString(result.getValue("fd".getBytes(), "run4".getBytes())), | |
Bytes.toString(result.getValue("fd".getBytes(), "run5".getBytes())) | |
), | |
(Bytes.toString(result.getValue("fd".getBytes(), "group".getBytes())), | |
maxscore, | |
Bytes.toString(result.getValue("fd".getBytes(), "outtempdif".getBytes())), | |
Bytes.toString(result.getValue("fd".getBytes(), "tempdif".getBytes())) | |
) | |
) | |
) | |
) | |
RDDscenvals.cache() | |
//join newly created scenarios with all other scenarios to create complet list of scenarios (best and alternative) | |
//collect all to a new array | |
val ClusterResSet = RDDscenvals.leftOuterJoin(RDDClusterWithCenters | |
).mapValues(result => if (result._2 == None) { (result._1._1,result._1._2) } else {(result._1._1,(result._2).get)} | |
).map(result => (vstartrow+result._1.substring(19,27), result._2 ) ).collect() | |
//insert new scenarios with new version (ipeoch) to hactscenario table | |
val confputscen = HBaseConfiguration.create() | |
val tableputscen = new HTable(confputscen,"hactscenario") | |
for (i <- 0 until ClusterResSet.size) { | |
val putrun = new Put((ClusterResSet(i)._1).getBytes()) | |
putrun.add("fd".getBytes(), "group".getBytes(), (ClusterResSet(i)._2._2._1).toString.getBytes() ) | |
putrun.add("fd".getBytes(), "score".getBytes(), (ClusterResSet(i)._2._2._2).toString.getBytes() ) | |
putrun.add("fd".getBytes(), "outtempdif".getBytes(), (ClusterResSet(i)._2._2._3).toString.getBytes() ) | |
putrun.add("fd".getBytes(), "tempdif".getBytes(), (ClusterResSet(i)._2._2._4).toString.getBytes() ) | |
putrun.add("fd".getBytes(), "iepoch".getBytes(), vepoch.toString.getBytes() ) | |
putrun.add("fd".getBytes(), "run0".getBytes(), (ClusterResSet(i)._2._1._1).toString.getBytes() ) | |
putrun.add("fd".getBytes(), "run1".getBytes(), (ClusterResSet(i)._2._1._2).toString.getBytes() ) | |
putrun.add("fd".getBytes(), "run2".getBytes(), (ClusterResSet(i)._2._1._3).toString.getBytes() ) | |
putrun.add("fd".getBytes(), "run3".getBytes(), (ClusterResSet(i)._2._1._4).toString.getBytes() ) | |
putrun.add("fd".getBytes(), "run4".getBytes(), (ClusterResSet(i)._2._1._5).toString.getBytes() ) | |
putrun.add("fd".getBytes(), "run5".getBytes(), (ClusterResSet(i)._2._1._6).toString.getBytes() ) | |
tableputscen.put(putrun) | |
} | |
println("done") | |
tableputscen.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment