Skip to content

Instantly share code, notes, and snippets.

@jamesbulpin
Created October 9, 2016 15:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jamesbulpin/ee1e60d1a8316c1907d26670f9492a32 to your computer and use it in GitHub Desktop.
Save jamesbulpin/ee1e60d1a8316c1907d26670f9492a32 to your computer and use it in GitHub Desktop.
A proxy that forwards MQTT messages received over Arduino USB serial to a MQTT broker - designed to be used as part of a Kodi add-on
#!/usr/bin/python
import threading
import time
import glob
import os
import os.path
import signal
import sys
import Queue
import socket
sys.path.append('/storage/.kodi/addons/script.service.lwrf433/resources')
import serial
import paho.mqtt.client as mqtt
try:
import xbmc
except ImportError, e:
class xbmc:
@staticmethod
def log(str):
print str
INTERFACE_TYPE_ARDUINO = 1
USB_IDS = {'2341': {'0042': [INTERFACE_TYPE_ARDUINO, 'Arduino Mega 2560 R3'],
'0043': [INTERFACE_TYPE_ARDUINO, 'Arduino Uno R3']},
'2a03': {'0042': [INTERFACE_TYPE_ARDUINO, 'Arduino Mega 2560 R3'],
'0043': [INTERFACE_TYPE_ARDUINO, 'Arduino Uno R3']}}
CONFIG = {INTERFACE_TYPE_ARDUINO: [57600]}
DEBUG = 0
def log(msg):
xbmc.log("[lwrf433rx] " + msg)
def debug(msg):
if DEBUG:
xbmc.log("[lwrf433rx] DEBUG " + msg)
class State:
def __init__(self):
self.by_devpath = {}
self.by_syspath = {}
self.clean_exit = False
self.threads = []
self.mqttclient = None
def register_device(self, dobject):
self.by_syspath[dobject.syspath] = dobject
self.by_devpath[dobject.devpath] = dobject
dobject.start()
def unregister_device(self, dobject):
del self.by_syspath[dobject.syspath]
del self.by_devpath[dobject.devpath]
dobject.stop_monitoring()
dobject.join()
def register_thread(self, t):
if not t in self.threads:
self.threads.append(t)
class ExtDevice(threading.Thread):
def __init__(self, state, syspath, devpath, interface_type):
super(ExtDevice, self).__init__()
self.state = state
self.syspath = syspath
self.devpath = devpath
self.interface_type = interface_type
self.monitor = True
self.suicide = False
self.baud = CONFIG[self.interface_type][0]
def setup(self):
# Override to perform device specific setup actions
pass
def run(self):
self.serial = serial.Serial(self.devpath, self.baud, timeout=10)
self.setup()
while self.monitor:
try:
x = self.serial.readline()
if x:
self.process_message(x.strip())
except Exception as e:
self.suicide = True
break
debug("Stopping monitoring %s" % (self.devpath))
def process_message(self, msg):
ll = msg.split()
if len(ll) > 1:
try:
debug("Sending message to home automation broker: %s=%s" % (ll[0], ll[1]))
self.state.mqttclient.publish(ll[0], ll[1]);
except Exception as e:
debug("MQTT publish excp '%s'" % (str(e)))
pass
debug("MSG '%s'" % (msg))
def stop_monitoring(self):
self.monitor = False
def send_message(self, msg):
try:
self.serial.write(msg + "\n")
except Exception as e:
debug("serial TX excp '%s'" % (str(e)))
pass
class DeviceWatcher(threading.Thread):
def __init__(self, state):
super(DeviceWatcher, self).__init__()
self.state = state
def run(self):
while not self.state.clean_exit:
debug("Checking available USB devices")
foundsps = []
spaths = glob.glob("/sys/bus/usb/devices/*/idVendor")
for sp in spaths:
if os.path.dirname(sp) in self.state.by_syspath.keys():
debug("Skipping already known %s" % (sp))
foundsps.append(os.path.dirname(sp))
continue
try:
serialstring = None
debug("Checking %s" % (sp))
f = open(sp, 'r')
vid = f.read().strip()
f.close()
print vid
if vid in USB_IDS.keys():
debug("Found known vendor ID %s" % (vid))
d = os.path.dirname(sp)
spp = os.path.join(d, 'idProduct')
if os.path.exists(spp):
debug("Checking %s" % (spp))
f = open(spp, 'r')
pid = f.read().strip()
f.close()
if pid in USB_IDS[vid]:
debug("Found known product ID %s:%s" % (vid, pid))
g = glob.glob(os.path.join(d, '*', 'tty', '*'))
g2 = glob.glob(os.path.join(d, '*', 'ttyUSB*'))
print g2
g.extend(g2)
if len(g) > 0:
tty = os.path.join("/dev",
os.path.basename(g[0]))
if os.path.exists(tty):
details = USB_IDS[vid][pid]
log("Found %s device %s" % (details[1], tty))
dobject = ExtDevice(self.state, d, tty, details[0])
self.state.register_device(dobject)
foundsps.append(d)
except Exception, e:
debug(str(e))
pass
# Check for any devices that have gone away
for sp in self.state.by_syspath.keys():
if not sp in foundsps:
log("Device at %s has gone away" % (sp))
self.state.unregister_device(self.state.by_syspath[sp])
# If any devices has self-terminated and were not cleaned-up
# above then remove them so they get re-polled next time
for dobject in self.state.by_syspath.values():
if dobject.suicide:
log("Device at %s has self-terminated" % (dobject.syspath))
self.state.unregister_device(dobject)
time.sleep(5)
class MQTTClient(threading.Thread):
def __init__(self, state, serverip, port):
super(MQTTClient, self).__init__()
self.state = state
debug("Connecting to MQTT broker at " + serverip)
self.mqttclient = mqtt.Client(protocol=mqtt.MQTTv31)
self.mqttclient.connect(serverip, port, 60)
self.state.mqttclient = self.mqttclient
def run(self):
while 1:
rc = self.mqttclient.loop(10)
if rc == 7:
self.mqttclient.reconnect()
class Watcher:
"""this class solves two problems with multithreaded
programs in Python, (1) a signal might be delivered
to any thread (which is just a malfeature) and (2) if
the thread that gets the signal is waiting, the signal
is ignored (which is a bug).
The watcher is a concurrent process (not thread) that
waits for a signal and the process that contains the
threads. See Appendix A of The Little Book of Semaphores.
http://greenteapress.com/semaphores/
I have only tested this on Linux. I would expect it to
work on the Macintosh and not work on Windows.
From: http://code.activestate.com/recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/
"""
def __init__(self):
""" Creates a child thread, which returns. The parent
thread waits for a KeyboardInterrupt and then kills
the child thread.
"""
self.child = os.fork()
if self.child == 0:
return
else:
self.watch()
def watch(self):
try:
os.wait()
except KeyboardInterrupt:
# I put the capital B in KeyBoardInterrupt so I can
# tell when the Watcher gets the SIGINT
print 'KeyBoardInterrupt'
self.kill()
sys.exit()
def kill(self):
try:
os.kill(self.child, signal.SIGKILL)
except OSError: pass
def main(mqttip):
Watcher()
state = State()
dw = DeviceWatcher(state)
dw.start()
state.register_thread(dw)
mq = MQTTClient(state, mqttip, 1883)
mq.start()
state.register_thread(mq)
if __name__ == "__main__":
if len(sys.argv) < 2:
log("No MQTT IP address provided")
sys.exit(1)
mqttip = sys.argv[1]
file("/tmp/433listener.py.pid","w").write(str(os.getpid()))
main(mqttip)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment