Skip to content

Instantly share code, notes, and snippets.

@un1tz3r0
Created August 26, 2018 20:38
Show Gist options
  • Save un1tz3r0/9eb81b1577b0bfd7db91661dfe63c513 to your computer and use it in GitHub Desktop.
Save un1tz3r0/9eb81b1577b0bfd7db91661dfe63c513 to your computer and use it in GitHub Desktop.
This is a sample implementation of a linux udev monitor to watch for devices being added and removed and spawns/stops a reader thread for each active device. The classes are reusable and some are very useful patterns that are common in asynchronous programs. The simple example application at the end demonstrates reading from all connected input …
from contextlib import contextmanager
import pyudev
from select import select
import threading
import select, os, queue, socket
class PollableQueue(queue.Queue):
def __init__(self):
super().__init__()
# Create a pair of connected sockets
if os.name == 'posix':
self._putsocket, self._getsocket = socket.socketpair()
else:
# Compatibility on non-POSIX systems
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 0))
server.listen(1)
self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._putsocket.connect(server.getsockname())
self._getsocket, _ = server.accept()
server.close()
def fileno(self):
return self._getsocket.fileno()
def put(self, item):
super().put(item)
self._putsocket.send(b'x')
def get(self):
self._getsocket.recv(1)
return super().get()
@contextmanager
def ObserveUdev(callback, context=None, subsystem=None):
from pyudev import Context, Monitor, MonitorObserver
if context is None:
context = Context()
monitor = Monitor.from_netlink(context)
if subsystem is not None:
monitor.filter_by(subsystem=subsystem)
def safecallback(action, device):
try:
# do not rely on device.action in callback, as it will not be set when adding existing devices on startup
# and removing remaining devices on cleanup
callback(action, device)
except:
import traceback
print('Unhandled exception raised while handling udev event {} on device {}: '.format(action, device.device_node))
traceback.print_exc()
def handleevent(device):
safecallback(device.action, device)
observer = MonitorObserver(monitor, callback=handleevent, name='monitor-observer')
observer.start()
# fire add events for all devices already connected
for device in context.list_devices(subsystem=subsystem):
safecallback("add", device)
try:
yield from (observer,)
finally:
observer.stop()
# fire remove events for all devices already connected
for device in context.list_devices(subsystem=subsystem):
safecallback("remove", device)
class CancelledException(Exception):
pass
threadlocal = threading.local()
class CancelableThread(threading.Thread):
def __init__(self, target=None):
self._canceling = False
self._cancelled = False
self._control = PollableQueue()
self._real_target = target
super().__init__()
def run(self):
global threadlocal
threadlocal._control = self._control
try:
self._real_target()
except CancelledException:
print("Uncaught CancelledException thrown in thread {}".format(self))
import traceback
traceback.print_exc()
def cancel(self):
self._control.put("cancel")
class BadThreadError(Exception):
'''Exception raised by cancelableselect() when invoked outside of a CancelableThread's target function.'''
pass
def cancelableselect(rfds, wfds, xfds, timeout=None):
import select
global threadlocal
if hasattr(threadlocal, "_control"):
rs,ws,xs = select.select(list(rfds)+[threadlocal._control], wfds, xfds, timeout)
if threadlocal._control in rs:
msg = threadlocal._control.get()
raise CancelledException()
else:
raise BadThreadError("warning, cancelableselect can't find thread-local _control queue")
# rs,ws,xs = select.select(rfds, wfds, xfds, timeout)
return rs,ws,xs
@contextmanager
def PerDeviceThreadPool(target, subsystem, filter=None):
threads = {}
def udevcallback(action, device):
nonlocal threads
nonlocal target
if device.device_node is not None and action == "add":
if device.device_node in threads.keys():
print("thread {} already exists for added device {} in udevcallback()!".format(threads[device.device_node], device.device_node))
else:
def threadmain():
nonlocal threads, target, device
print("calling {} with device {} in per-device thread {}".format(target, device, threading.current_thread()))
try:
target(device)
finally:
print("done with device {} in per-device thread {}".format(target, device, threading.current_thread()))
del threads[device.device_node]
th = CancelableThread(threadmain)
threads[device.device_node] = th
th.start()
print("spawned thread {} for added device {}".format(th, device))
elif action == "remove":
if device.device_node is not None and device.device_node in threads.keys():
print("canceling thread {} for removed device {}".format(threads[device.device_node], device))
thr = threads[device.device_node]
thr.cancel()
thr.join()
#del threads[device.device_node]
print("cancelled thread for removed device {}".format(device.device_node))
print("starting udev observer")
with ObserveUdev(udevcallback, None, subsystem) as observer:
yield observer
print("cleaning up udev observer")
'''
Example application which watches for input devices being added and removed, and spawns
threads to read each active device, aggregating all the the input into a single
queue for the main thread to process.
'''
aggregate = PollableQueue()
def readerthread(device):
try:
fd = open(device.device_node, "rb")
try:
while True:
r,w,x = cancelableselect([fd], [], [])
if fd in r:
aggregate.put([device.device_node, fd.read(1)])
finally:
fd.close()
except Exception as err:
print("error reading from {}: {}".format(device, err))
print("entering program")
with PerDeviceThreadPool(readerthread, "input") as pool:
try:
while True:
print(aggregate.get())
except:
print("exception thrown in main loop")
print("exiting program")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment