Skip to content

Instantly share code, notes, and snippets.

@niektemme
Created July 31, 2015 10:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save niektemme/0295b0c5a60d839ccbbb to your computer and use it in GitHub Desktop.
Save niektemme/0295b0c5a60d839ccbbb to your computer and use it in GitHub Desktop.
Smart Thermostat AWS Spark - part Spark cluster final
//alternative scnearios get highest score
val maxscore : Int = ((RDDtotalClustered.map(result=> (result._2._1,1)).sortByKey(false).take(1))(0))._1 + 10
//determine cluster centers (centers are new tempdif and outtempdif)
val RDDcenters = sc.parallelize(clusters.clusterCenters)
val RDDcentersCalc = RDDcenters.map(result => (clusters.predict(result) , result ) )
RDDcentersCalc.cache()
//join best scenarios for temperature groups with cluster centers
val RDDClusterWithCenters = RDDtotalClustered.join(RDDcentersCalc
).map(result => (result._2._1._2, (result._1, result._2._1._1, math.round(result._2._2(0)), math.round(result._2._2(1)) ) ))
RDDClusterWithCenters.cache()
//scan scenario hbase table te retreive scenario information
val bvepoch: Long = 9999999999L
val vepoch: Long = System.currentTimeMillis / 1000
val startmepoch : Long = bvepoch - vepoch
val vstartrow: String = "40b5af01_"+ startmepoch.toString
val confscenvals = HBaseConfiguration.create()
val scanscenvals = new Scan()
confscenvals.set(TableInputFormat.INPUT_TABLE, "hactscenario")
confscenvals.set(TableInputFormat.SCAN, convertScanToString(scanscenvals))
val RDDconfscenvals = sc.newAPIHadoopRDD(confscenvals, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
val RDDscenvals = RDDconfscenvals.map(tuple => tuple._2
).map(result => ( Bytes.toString(result.getRow()) ,
(
(Bytes.toString(result.getValue("fd".getBytes(), "run0".getBytes())) ,
Bytes.toString(result.getValue("fd".getBytes(), "run1".getBytes())),
Bytes.toString(result.getValue("fd".getBytes(), "run2".getBytes())),
Bytes.toString(result.getValue("fd".getBytes(), "run3".getBytes())) ,
Bytes.toString(result.getValue("fd".getBytes(), "run4".getBytes())),
Bytes.toString(result.getValue("fd".getBytes(), "run5".getBytes()))
),
(Bytes.toString(result.getValue("fd".getBytes(), "group".getBytes())),
maxscore,
Bytes.toString(result.getValue("fd".getBytes(), "outtempdif".getBytes())),
Bytes.toString(result.getValue("fd".getBytes(), "tempdif".getBytes()))
)
)
)
)
RDDscenvals.cache()
//join newly created scenarios with all other scenarios to create complet list of scenarios (best and alternative)
//collect all to a new array
val ClusterResSet = RDDscenvals.leftOuterJoin(RDDClusterWithCenters
).mapValues(result => if (result._2 == None) { (result._1._1,result._1._2) } else {(result._1._1,(result._2).get)}
).map(result => (vstartrow+result._1.substring(19,27), result._2 ) ).collect()
//insert new scenarios with new version (ipeoch) to hactscenario table
val confputscen = HBaseConfiguration.create()
val tableputscen = new HTable(confputscen,"hactscenario")
for (i <- 0 until ClusterResSet.size) {
val putrun = new Put((ClusterResSet(i)._1).getBytes())
putrun.add("fd".getBytes(), "group".getBytes(), (ClusterResSet(i)._2._2._1).toString.getBytes() )
putrun.add("fd".getBytes(), "score".getBytes(), (ClusterResSet(i)._2._2._2).toString.getBytes() )
putrun.add("fd".getBytes(), "outtempdif".getBytes(), (ClusterResSet(i)._2._2._3).toString.getBytes() )
putrun.add("fd".getBytes(), "tempdif".getBytes(), (ClusterResSet(i)._2._2._4).toString.getBytes() )
putrun.add("fd".getBytes(), "iepoch".getBytes(), vepoch.toString.getBytes() )
putrun.add("fd".getBytes(), "run0".getBytes(), (ClusterResSet(i)._2._1._1).toString.getBytes() )
putrun.add("fd".getBytes(), "run1".getBytes(), (ClusterResSet(i)._2._1._2).toString.getBytes() )
putrun.add("fd".getBytes(), "run2".getBytes(), (ClusterResSet(i)._2._1._3).toString.getBytes() )
putrun.add("fd".getBytes(), "run3".getBytes(), (ClusterResSet(i)._2._1._4).toString.getBytes() )
putrun.add("fd".getBytes(), "run4".getBytes(), (ClusterResSet(i)._2._1._5).toString.getBytes() )
putrun.add("fd".getBytes(), "run5".getBytes(), (ClusterResSet(i)._2._1._6).toString.getBytes() )
tableputscen.put(putrun)
}
println("done")
tableputscen.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment