Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
#/usr/bin/python
#
# Script that pulls a reltime energy consumption feed from nº10 Downning street
# The data is processed and sent to an 8x8 dot matrix display to be visualised.
#
# Copyleft Gonzalo Garcia-Perate g.garcia-perate [at] ucl.ac.uk
# for more info visit http://makingsenseofspace.com, http://vimeo.com/16567491
#
from restkit import request
import json, pdb, time, serial
class EnergyData():
def __init__( self, date = None, n_items = 8 ):
if date is None:
from datetime import datetime
t = datetime.today()
self.date = "-".join( str( i ) for i in [t.year, t.month, t.day] )
self.today = True
else:
self.date = date
self.hour_start = '+00:00:00'
self.hour_end = '+23:59:59'
self.data_service = "http://data.carbonculture.net/number10/10-downing-street/elec?with-timestamp=true&start-time="
self.n_items = n_items
def load_data( self ):
if self.today:
self.req = request( self.data_service + self.date + self.hour_start )
else:
self.req = request( self.data_service + self.date + self.hour_start + "&end_time=" + self.date + self.hour_end )
try:
self.raw_data = json.loads( self.req.body_string() )
except:
print "data has already been read"
self.results = self.raw_data["results"][-self.n_items * 2:]
self.energy_data = []
self.scaled_data = []
self.rounded_data = []
for i in range( 0, len( self.results ) ):
if i % 2 == 0:
#self.energy_data.append( ( self.results[i][1] + self.results[i + 1][1] ) / 2 ) # we can average hourly data if we want although it smoothes the trend line
self.energy_data.append( self.results[i][1] ) #we only take data at hourly intervals
[self.scaled_data.append( self.scale_number( item ) ) for item in self.energy_data]
[self.rounded_data.append( int( round( self.scale_number( item ) ) ) ) for item in self.energy_data]
self.req.close()
def get_trend( self ):
return self.rounded_data #returns data for the last 8 hours
def scale_number( self, n, a = 8.0, b = 1.0 ):
old_range = ( max( self.energy_data ) - min( self.energy_data ) )
new_range = ( a - b )
return ( ( ( n - min( self.energy_data ) ) * new_range ) / old_range ) + 1.0
# pdb.set_trace()
def print_data( self ):
# print(data)
print self.results
print self.energy_data
print self.scaled_data
print self.rounded_data
print "".join( str( i ) for i in self.rounded_data )
print "max energy value: %s" % ( max( self.energy_data ), )
print "min energy value: %s" % ( min( self.energy_data ), )
print "number of items: %s" % ( len( self.scaled_data ), )
class SerialDisplay():
# def __init__( self, port = "/dev/tty.usbserial-A6008bSe", baud = 9600 ):
def __init__( self, port = "/dev/tty.BlueRadios-COM0", baud = 9600 ): #Bluetooth modem
self.s = serial.Serial( port, baud )
def send_data( self, data ):
# print "seding data to display %s" % ( data, )
self.s.write( data.encode( "ascii" ) )
time.sleep( 1 )
# print "Received: %s" % self.s.read( 10 ).__repr__()
def close( self ):
self.s.close()
def main():
e = EnergyData()
s = SerialDisplay()
while 1:
e.load_data()
e.print_data()
rounded_data = e.get_trend()
comparison_data = rounded_data[-2:]
print
print "-------------------"
time.sleep( 20 ) #we need roughly 15-20 secs to stablish robust comms with Bluetooth transciever
print
print "sending plot data %s " % ( rounded_data, )
s.send_data( "<pl" + "".join( str( i ) for i in rounded_data ) + ">" ) #we send trend data to plot
time.sleep( 15 )
print
print "sending ab comparison data %s " % ( comparison_data, )
s.send_data( "<ab" + "".join( str( i ) for i in comparison_data ) + ">" )
time.sleep( 15 )
print
print "sending fluctuation data %s" % ( comparison_data, )
s.send_data( "<fl" + "".join( str( i ) for i in comparison_data ) + ">" )
#this should be somewhere around 30 min (1800 s) since that's the time interval data is published
#in which case we would close the serial port too
time.sleep( 15 )
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment