Skip to content

Instantly share code, notes, and snippets.

@lpereira
Last active August 29, 2015 14:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lpereira/31b2ace8acf07594e5e9 to your computer and use it in GitHub Desktop.
Save lpereira/31b2ace8acf07594e5e9 to your computer and use it in GitHub Desktop.
IoTomada
#!/usr/bin/python
import mraa
import time
import pyupm_i2clcd
import pyupm_grove
import SimpleXMLRPCServer
import netifaces as ni
import threading
import Queue
import os
import requests
class Background(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.queue = Queue.Queue()
def run(self):
while True:
try:
func, args = self.queue.get(block=True)
func(*args)
except Exception, e:
print "Ignoring exception:", e
def enqueue(self, func, *args):
self.queue.put((func, args))
class Dispatcher:
def __init__(self, **proxies):
self.proxies = proxies
def _dispatch(self, method, params):
m = method.split('_', 1)
if len(m) < 2:
return False
proxy = self.proxies[m[0]]
attr = getattr(proxy, 'api_' + m[1], None)
if attr:
proxy.enqueue(attr, *params)
return True
return False
class Gpio(Background):
def __init__(self, pin):
Background.__init__(self)
self.pin = mraa.Gpio(pin)
self.pin.dir(mraa.DIR_OUT)
self.start()
def api_liga(self):
self.pin.write(1)
def api_desliga(self):
self.pin.write(0)
def constrain(value, vmin, vmax):
if value < vmin:
return vmin
if value > vmax:
return vmax
return value
class Sistema(Background):
def __init__(self):
Background.__init__(self)
def api_reboot(self):
os.system("reboot")
def api_soft_reset(self):
os.exit(0)
class LCD(Background):
def __init__(self, lcd):
Background.__init__(self)
self.lcd = lcd
self.start()
def api_msg(self, msg):
self.lcd.setCursor(1, 1)
self.lcd.write(msg)
def api_msg_color(self, r, g, b):
r = constrain(r, 0, 255)
g = constrain(g, 0, 255)
b = constrain(b, 0, 255)
self.lcd.setColor(r, g, b)
class LCDScroller(threading.Thread):
def __init__(self, lcd, lampada):
threading.Thread.__init__(self)
self.lcd = lcd
self.lampada = lampada
self.button = pyupm_grove.GroveButton(8)
def run(self):
last_btn = self.button.value()
btn = not last_btn
while True:
if btn != last_btn:
last_btn = btn
if last_btn:
self.lcd.setColor(128, 64, 32)
self.lampada.api_liga()
else:
self.lcd.setColor(64, 128, 32)
self.lampada.api_desliga()
self.lcd.scroll(not not btn)
time.sleep(0.2)
btn = self.button.value()
class Talkback(threading.Thread):
def __init__(self, dispatcher, lcd):
threading.Thread.__init__(self)
self.dispatcher = dispatcher
self.lcd = lcd
self.start()
def run(self):
while True:
self.lcd.setColor(255, 128, 64)
try:
r = requests.get("https://api.thingspeak.com/talkbacks/XXXX/commands/execute",
params = {'api_key': 'YYYY'})
method = r.text.replace('-', '_')
self.dispatcher._dispatch(method, ())
except:
pass
finally:
self.lcd.setColor(64, 255, 128)
time.sleep(5)
def get_ip_addr(iface):
while True:
try:
ip = ni.ifaddresses(iface)
return ip[2][0]['addr']
except:
sleep(0.1)
if __name__ == '__main__':
print 'Init LCD'
lcd = pyupm_i2clcd.Jhd1313m1(6, 0x3e, 0x62)
ip = get_ip_addr('wlan0')
lcd.setCursor(0, 1)
lcd.write("Endereco IP: " + ip)
lcd.setCursor(1, 1)
lcd.write("#intelmaker Sao Paulo 2015")
lampada = Gpio(2)
lcd_scroller = LCDScroller(lcd, lampada)
lcd_scroller.start()
print 'Init XML-RPC server'
server = SimpleXMLRPCServer.SimpleXMLRPCServer(('0.0.0.0', 8080))
print 'Register xml proxy for lampada'
dispatcher = Dispatcher(lampada=lampada, tomada=Gpio(3), lcd=LCD(lcd), sistema=Sistema())
talkback = Talkback(dispatcher, lcd)
server.register_instance(dispatcher)
print 'Serving'
server.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment