Created
August 26, 2018 20:38
-
-
Save un1tz3r0/9eb81b1577b0bfd7db91661dfe63c513 to your computer and use it in GitHub Desktop.
This is a sample implementation of a linux udev monitor to watch for devices being added and removed and spawns/stops a reader thread for each active device. The classes are reusable and some are very useful patterns that are common in asynchronous programs. The simple example application at the end demonstrates reading from all connected input …
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from contextlib import contextmanager | |
import pyudev | |
from select import select | |
import threading | |
import select, os, queue, socket | |
class PollableQueue(queue.Queue): | |
def __init__(self): | |
super().__init__() | |
# Create a pair of connected sockets | |
if os.name == 'posix': | |
self._putsocket, self._getsocket = socket.socketpair() | |
else: | |
# Compatibility on non-POSIX systems | |
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server.bind(('127.0.0.1', 0)) | |
server.listen(1) | |
self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self._putsocket.connect(server.getsockname()) | |
self._getsocket, _ = server.accept() | |
server.close() | |
def fileno(self): | |
return self._getsocket.fileno() | |
def put(self, item): | |
super().put(item) | |
self._putsocket.send(b'x') | |
def get(self): | |
self._getsocket.recv(1) | |
return super().get() | |
@contextmanager | |
def ObserveUdev(callback, context=None, subsystem=None): | |
from pyudev import Context, Monitor, MonitorObserver | |
if context is None: | |
context = Context() | |
monitor = Monitor.from_netlink(context) | |
if subsystem is not None: | |
monitor.filter_by(subsystem=subsystem) | |
def safecallback(action, device): | |
try: | |
# do not rely on device.action in callback, as it will not be set when adding existing devices on startup | |
# and removing remaining devices on cleanup | |
callback(action, device) | |
except: | |
import traceback | |
print('Unhandled exception raised while handling udev event {} on device {}: '.format(action, device.device_node)) | |
traceback.print_exc() | |
def handleevent(device): | |
safecallback(device.action, device) | |
observer = MonitorObserver(monitor, callback=handleevent, name='monitor-observer') | |
observer.start() | |
# fire add events for all devices already connected | |
for device in context.list_devices(subsystem=subsystem): | |
safecallback("add", device) | |
try: | |
yield from (observer,) | |
finally: | |
observer.stop() | |
# fire remove events for all devices already connected | |
for device in context.list_devices(subsystem=subsystem): | |
safecallback("remove", device) | |
class CancelledException(Exception): | |
pass | |
threadlocal = threading.local() | |
class CancelableThread(threading.Thread): | |
def __init__(self, target=None): | |
self._canceling = False | |
self._cancelled = False | |
self._control = PollableQueue() | |
self._real_target = target | |
super().__init__() | |
def run(self): | |
global threadlocal | |
threadlocal._control = self._control | |
try: | |
self._real_target() | |
except CancelledException: | |
print("Uncaught CancelledException thrown in thread {}".format(self)) | |
import traceback | |
traceback.print_exc() | |
def cancel(self): | |
self._control.put("cancel") | |
class BadThreadError(Exception): | |
'''Exception raised by cancelableselect() when invoked outside of a CancelableThread's target function.''' | |
pass | |
def cancelableselect(rfds, wfds, xfds, timeout=None): | |
import select | |
global threadlocal | |
if hasattr(threadlocal, "_control"): | |
rs,ws,xs = select.select(list(rfds)+[threadlocal._control], wfds, xfds, timeout) | |
if threadlocal._control in rs: | |
msg = threadlocal._control.get() | |
raise CancelledException() | |
else: | |
raise BadThreadError("warning, cancelableselect can't find thread-local _control queue") | |
# rs,ws,xs = select.select(rfds, wfds, xfds, timeout) | |
return rs,ws,xs | |
@contextmanager | |
def PerDeviceThreadPool(target, subsystem, filter=None): | |
threads = {} | |
def udevcallback(action, device): | |
nonlocal threads | |
nonlocal target | |
if device.device_node is not None and action == "add": | |
if device.device_node in threads.keys(): | |
print("thread {} already exists for added device {} in udevcallback()!".format(threads[device.device_node], device.device_node)) | |
else: | |
def threadmain(): | |
nonlocal threads, target, device | |
print("calling {} with device {} in per-device thread {}".format(target, device, threading.current_thread())) | |
try: | |
target(device) | |
finally: | |
print("done with device {} in per-device thread {}".format(target, device, threading.current_thread())) | |
del threads[device.device_node] | |
th = CancelableThread(threadmain) | |
threads[device.device_node] = th | |
th.start() | |
print("spawned thread {} for added device {}".format(th, device)) | |
elif action == "remove": | |
if device.device_node is not None and device.device_node in threads.keys(): | |
print("canceling thread {} for removed device {}".format(threads[device.device_node], device)) | |
thr = threads[device.device_node] | |
thr.cancel() | |
thr.join() | |
#del threads[device.device_node] | |
print("cancelled thread for removed device {}".format(device.device_node)) | |
print("starting udev observer") | |
with ObserveUdev(udevcallback, None, subsystem) as observer: | |
yield observer | |
print("cleaning up udev observer") | |
''' | |
Example application which watches for input devices being added and removed, and spawns | |
threads to read each active device, aggregating all the the input into a single | |
queue for the main thread to process. | |
''' | |
aggregate = PollableQueue() | |
def readerthread(device): | |
try: | |
fd = open(device.device_node, "rb") | |
try: | |
while True: | |
r,w,x = cancelableselect([fd], [], []) | |
if fd in r: | |
aggregate.put([device.device_node, fd.read(1)]) | |
finally: | |
fd.close() | |
except Exception as err: | |
print("error reading from {}: {}".format(device, err)) | |
print("entering program") | |
with PerDeviceThreadPool(readerthread, "input") as pool: | |
try: | |
while True: | |
print(aggregate.get()) | |
except: | |
print("exception thrown in main loop") | |
print("exiting program") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment