Skip to content

Instantly share code, notes, and snippets.

@niektemme
Last active August 29, 2015 14:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save niektemme/7012416c53926c612c71 to your computer and use it in GitHub Desktop.
Save niektemme/7012416c53926c612c71 to your computer and use it in GitHub Desktop.
Smart Thermostat AWS Spark - part Spark cluster retrieve temparature scenarios
//spark settings
val sparkconf = new SparkConf().setAppName("tempcluster")
val sc = new SparkContext(sparkconf)
//scan hbase used scenario table to collect used scenarios that have been scored
val confonoff = HBaseConfiguration.create()
val scanonoff = new Scan()
scanonoff.addColumn("fd".getBytes(), "outtempdif".getBytes())
scanonoff.addColumn("fd".getBytes(), "tempdif".getBytes())
scanonoff.addColumn("fd".getBytes(), "score".getBytes())
scanonoff.addColumn("fd".getBytes(), "scenariokey".getBytes())
scanonoff.setStopRow("40b5af01_9999999999_40b5af01_9999999999_9999999".getBytes())
confonoff.set(TableInputFormat.INPUT_TABLE, "husedscenario")
confonoff.set(TableInputFormat.SCAN, convertScanToString(scanonoff))
//create RDD with used scenarios
val RDDonOff = sc.newAPIHadoopRDD(confonoff, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
val valRowsOnOff = RDDonOff.map(tuple => tuple._2
).filter(result => Bytes.toString(result.getValue("fd".getBytes(), "score".getBytes())) != null //only select scored scenarios
).map(result => ( Bytes.toString(result.getValue("fd".getBytes(), "scenariokey".getBytes())), ( Bytes.toString(result.getValue("fd".getBytes(), "outtempdif".getBytes())).toInt , Bytes.toString(result.getValue("fd".getBytes(), "tempdif".getBytes())).toInt , Bytes.toString(result.getValue("fd".getBytes(), "score".getBytes())).toLong ) )
)
valRowsOnOff.cache()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment