Skip to content

Instantly share code, notes, and snippets.

@nicovillanueva
Last active August 29, 2015 14:23
Show Gist options
  • Save nicovillanueva/1648b066e4445f89b2cc to your computer and use it in GitHub Desktop.
Save nicovillanueva/1648b066e4445f89b2cc to your computer and use it in GitHub Desktop.
Relay + Temp sensor
#!/usr/bin/python
import Adafruit_BBIO.GPIO as GPIO
from w1thermsensor import W1ThermSensor
import time, json, thread
TARGETTEMP = 80.0
THRESHOLD = 1.0
READDELAY = 5
DUMPEVERY = 500
DUMPFILE = "readings.json"
relaypin = "P8_10"
sensor = None
limits = {"high": 0.0, "low": 999.0}
readings = []
readingsamount = 0
timestart = time.time()
def setup():
global sensor
sensor = W1ThermSensor()
GPIO.setup(relaypin, GPIO.OUT)
GPIO.output(relaypin, GPIO.HIGH) # Turn off relay by default
with open(DUMPFILE, "w") as f:
l = "{\"readings\":["
f.writelines(l)
def teardown():
import os
global readings, readingsamount
timeend = time.time()
final_report = {"timings": {"start": timestart, "end": timeend}}
switch_relay(False)
dump_readings(readings)
with open(DUMPFILE, "a+") as f:
# We can't 'natively' create a JSON file that progressively dumps to
# a file. To do so would mean loading the whole file into memory, and
# them dumping it to a file once again, defeating the
# purpose of doing an "append only" file. That's why we build
# the JSON file manually.
f.seek(-1, os.SEEK_END)
f.truncate() # Remove the last comma introduced by 'dump_readings()'
f.writelines("], %s}" % (repr(final_report)[1:-1]).replace("'", "\""))
print "Shutting down. Final temp: %.3f C" % sensor.get_temperature()
print "Did %i readings" % readingsamount
def switch_relay(setting):
GPIO.output(relaypin, GPIO.LOW if setting else GPIO.HIGH)
def relay_status():
return True if GPIO.input(relaypin) == 0 else False
def dump_readings(readings):
# Protip: It's estimated that 9 days of logging every 2 seconds
# would take up around 12 GB of storage.
with open(DUMPFILE, "a+") as f:
f.writelines("%s," % json.dumps(readings)[1:-1])
def status_report(temp, readings):
print "Target | %.3f +/- %.3f" % (TARGETTEMP, THRESHOLD)
print "Temperature | %.3f" % temp
print "Highest/lowest | %.3f / %.3f" % (limits["high"], limits["low"])
print "Readings | %i" % readingsamount
print "Relay status | %s" % ("ON" if relay_status() else "OFF")
print ""
def main():
global readings, readingsamount
while True:
temp = sensor.get_temperature()
if temp < (TARGETTEMP - THRESHOLD):
if not relay_status():
switch_relay(True)
#elif temp > (TARGETTEMP + THRESHOLD):
# Cooling code would go here.
#if relay_status():
# switch_relay(False)
else:
# Upper limit is useless without true cooling,
# so just turn off at tripping point
if relay_status():
switch_relay(False)
if temp > limits.get("high"):
limits["high"] = temp
if temp < limits.get("low"):
limits["low"] = temp
newread = {"time": time.time(), "temp": temp}
if len(readings) == 0 or readings[-1].get("temp") != newread.get("temp"):
readings.append(newread)
readingsamount += 1
status_report(temp, readingsamount)
if len(readings) >= DUMPEVERY:
print "Dumping to file..."
thread.start_new_thread(dump_readings, (readings[:],))
readings = []
time.sleep(READDELAY)
if __name__ == '__main__':
try:
setup()
main()
except KeyboardInterrupt:
#switch_relay(False)
teardown()
except Exception as e:
print "ERROR! Something happened: "
print e
teardown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment