Created
October 9, 2016 15:45
-
-
Save jamesbulpin/ee1e60d1a8316c1907d26670f9492a32 to your computer and use it in GitHub Desktop.
A proxy that forwards MQTT messages received over Arduino USB serial to a MQTT broker - designed to be used as part of a Kodi add-on
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import threading | |
import time | |
import glob | |
import os | |
import os.path | |
import signal | |
import sys | |
import Queue | |
import socket | |
sys.path.append('/storage/.kodi/addons/script.service.lwrf433/resources') | |
import serial | |
import paho.mqtt.client as mqtt | |
try: | |
import xbmc | |
except ImportError, e: | |
class xbmc: | |
@staticmethod | |
def log(str): | |
print str | |
INTERFACE_TYPE_ARDUINO = 1 | |
USB_IDS = {'2341': {'0042': [INTERFACE_TYPE_ARDUINO, 'Arduino Mega 2560 R3'], | |
'0043': [INTERFACE_TYPE_ARDUINO, 'Arduino Uno R3']}, | |
'2a03': {'0042': [INTERFACE_TYPE_ARDUINO, 'Arduino Mega 2560 R3'], | |
'0043': [INTERFACE_TYPE_ARDUINO, 'Arduino Uno R3']}} | |
CONFIG = {INTERFACE_TYPE_ARDUINO: [57600]} | |
DEBUG = 0 | |
def log(msg): | |
xbmc.log("[lwrf433rx] " + msg) | |
def debug(msg): | |
if DEBUG: | |
xbmc.log("[lwrf433rx] DEBUG " + msg) | |
class State: | |
def __init__(self): | |
self.by_devpath = {} | |
self.by_syspath = {} | |
self.clean_exit = False | |
self.threads = [] | |
self.mqttclient = None | |
def register_device(self, dobject): | |
self.by_syspath[dobject.syspath] = dobject | |
self.by_devpath[dobject.devpath] = dobject | |
dobject.start() | |
def unregister_device(self, dobject): | |
del self.by_syspath[dobject.syspath] | |
del self.by_devpath[dobject.devpath] | |
dobject.stop_monitoring() | |
dobject.join() | |
def register_thread(self, t): | |
if not t in self.threads: | |
self.threads.append(t) | |
class ExtDevice(threading.Thread): | |
def __init__(self, state, syspath, devpath, interface_type): | |
super(ExtDevice, self).__init__() | |
self.state = state | |
self.syspath = syspath | |
self.devpath = devpath | |
self.interface_type = interface_type | |
self.monitor = True | |
self.suicide = False | |
self.baud = CONFIG[self.interface_type][0] | |
def setup(self): | |
# Override to perform device specific setup actions | |
pass | |
def run(self): | |
self.serial = serial.Serial(self.devpath, self.baud, timeout=10) | |
self.setup() | |
while self.monitor: | |
try: | |
x = self.serial.readline() | |
if x: | |
self.process_message(x.strip()) | |
except Exception as e: | |
self.suicide = True | |
break | |
debug("Stopping monitoring %s" % (self.devpath)) | |
def process_message(self, msg): | |
ll = msg.split() | |
if len(ll) > 1: | |
try: | |
debug("Sending message to home automation broker: %s=%s" % (ll[0], ll[1])) | |
self.state.mqttclient.publish(ll[0], ll[1]); | |
except Exception as e: | |
debug("MQTT publish excp '%s'" % (str(e))) | |
pass | |
debug("MSG '%s'" % (msg)) | |
def stop_monitoring(self): | |
self.monitor = False | |
def send_message(self, msg): | |
try: | |
self.serial.write(msg + "\n") | |
except Exception as e: | |
debug("serial TX excp '%s'" % (str(e))) | |
pass | |
class DeviceWatcher(threading.Thread): | |
def __init__(self, state): | |
super(DeviceWatcher, self).__init__() | |
self.state = state | |
def run(self): | |
while not self.state.clean_exit: | |
debug("Checking available USB devices") | |
foundsps = [] | |
spaths = glob.glob("/sys/bus/usb/devices/*/idVendor") | |
for sp in spaths: | |
if os.path.dirname(sp) in self.state.by_syspath.keys(): | |
debug("Skipping already known %s" % (sp)) | |
foundsps.append(os.path.dirname(sp)) | |
continue | |
try: | |
serialstring = None | |
debug("Checking %s" % (sp)) | |
f = open(sp, 'r') | |
vid = f.read().strip() | |
f.close() | |
print vid | |
if vid in USB_IDS.keys(): | |
debug("Found known vendor ID %s" % (vid)) | |
d = os.path.dirname(sp) | |
spp = os.path.join(d, 'idProduct') | |
if os.path.exists(spp): | |
debug("Checking %s" % (spp)) | |
f = open(spp, 'r') | |
pid = f.read().strip() | |
f.close() | |
if pid in USB_IDS[vid]: | |
debug("Found known product ID %s:%s" % (vid, pid)) | |
g = glob.glob(os.path.join(d, '*', 'tty', '*')) | |
g2 = glob.glob(os.path.join(d, '*', 'ttyUSB*')) | |
print g2 | |
g.extend(g2) | |
if len(g) > 0: | |
tty = os.path.join("/dev", | |
os.path.basename(g[0])) | |
if os.path.exists(tty): | |
details = USB_IDS[vid][pid] | |
log("Found %s device %s" % (details[1], tty)) | |
dobject = ExtDevice(self.state, d, tty, details[0]) | |
self.state.register_device(dobject) | |
foundsps.append(d) | |
except Exception, e: | |
debug(str(e)) | |
pass | |
# Check for any devices that have gone away | |
for sp in self.state.by_syspath.keys(): | |
if not sp in foundsps: | |
log("Device at %s has gone away" % (sp)) | |
self.state.unregister_device(self.state.by_syspath[sp]) | |
# If any devices has self-terminated and were not cleaned-up | |
# above then remove them so they get re-polled next time | |
for dobject in self.state.by_syspath.values(): | |
if dobject.suicide: | |
log("Device at %s has self-terminated" % (dobject.syspath)) | |
self.state.unregister_device(dobject) | |
time.sleep(5) | |
class MQTTClient(threading.Thread): | |
def __init__(self, state, serverip, port): | |
super(MQTTClient, self).__init__() | |
self.state = state | |
debug("Connecting to MQTT broker at " + serverip) | |
self.mqttclient = mqtt.Client(protocol=mqtt.MQTTv31) | |
self.mqttclient.connect(serverip, port, 60) | |
self.state.mqttclient = self.mqttclient | |
def run(self): | |
while 1: | |
rc = self.mqttclient.loop(10) | |
if rc == 7: | |
self.mqttclient.reconnect() | |
class Watcher: | |
"""this class solves two problems with multithreaded | |
programs in Python, (1) a signal might be delivered | |
to any thread (which is just a malfeature) and (2) if | |
the thread that gets the signal is waiting, the signal | |
is ignored (which is a bug). | |
The watcher is a concurrent process (not thread) that | |
waits for a signal and the process that contains the | |
threads. See Appendix A of The Little Book of Semaphores. | |
http://greenteapress.com/semaphores/ | |
I have only tested this on Linux. I would expect it to | |
work on the Macintosh and not work on Windows. | |
From: http://code.activestate.com/recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/ | |
""" | |
def __init__(self): | |
""" Creates a child thread, which returns. The parent | |
thread waits for a KeyboardInterrupt and then kills | |
the child thread. | |
""" | |
self.child = os.fork() | |
if self.child == 0: | |
return | |
else: | |
self.watch() | |
def watch(self): | |
try: | |
os.wait() | |
except KeyboardInterrupt: | |
# I put the capital B in KeyBoardInterrupt so I can | |
# tell when the Watcher gets the SIGINT | |
print 'KeyBoardInterrupt' | |
self.kill() | |
sys.exit() | |
def kill(self): | |
try: | |
os.kill(self.child, signal.SIGKILL) | |
except OSError: pass | |
def main(mqttip): | |
Watcher() | |
state = State() | |
dw = DeviceWatcher(state) | |
dw.start() | |
state.register_thread(dw) | |
mq = MQTTClient(state, mqttip, 1883) | |
mq.start() | |
state.register_thread(mq) | |
if __name__ == "__main__": | |
if len(sys.argv) < 2: | |
log("No MQTT IP address provided") | |
sys.exit(1) | |
mqttip = sys.argv[1] | |
file("/tmp/433listener.py.pid","w").write(str(os.getpid())) | |
main(mqttip) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment