Created
August 23, 2017 19:16
-
-
Save un1tz3r0/ab896e787b95293af08d3a2b3b715583 to your computer and use it in GitHub Desktop.
python midi repeater with dynamic usb via udev
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import pyudev | |
import select | |
import functools | |
import re | |
import os | |
import codecs | |
fds = {} | |
devs = {} | |
def handle_udev_event(action, device): | |
global fds | |
global devs | |
if not device.device_node: | |
return | |
if not re.match(u"^/dev/midi[0-9]+$", device.device_node): | |
return | |
node = device.device_node | |
#print('ACTION={0}: DEVICE={1} SUBSYSTEM={2} TAGS={3}'.format(action, device, device.subsystem, list(device.tags))) | |
if action == "remove": | |
if node in devs.keys(): | |
if devs[node] in fds.keys(): | |
del fds[devs[node]] | |
os.close(devs[node]) | |
print("removed "+node+" closed fd "+str(devs[node])) | |
del devs[node] | |
elif action == "add": | |
if node not in devs.keys(): | |
fd = os.open(node, os.O_RDWR) | |
devs[node] = fd | |
fds[fd] = bytearray(0) | |
print("added "+node+" opened fd "+str(fd)) | |
context = pyudev.Context() | |
for device in iter(context.list_devices(subsystem='sound')): | |
handle_udev_event('add', device) | |
monitor = pyudev.Monitor.from_netlink(context) | |
monitor.filter_by(subsystem='sound') | |
monitor.enable_receiving() | |
while True: | |
reading = [monitor.fileno()] + list(fds.keys()) | |
writing = [k for k in fds.keys() if len(fds[k]) > 0] | |
trying = reading | |
readable, writable, exceptional = select.select(reading, writing, trying) | |
if monitor.fileno() in readable: | |
readable.remove(monitor.fileno()) | |
for udev in iter(functools.partial(monitor.poll, 0), None): | |
handle_udev_event(udev.action, udev) | |
for f in writable: | |
n = os.write(f, fds[f]) | |
print("Wrote %d bytes to descriptor %d: %s" % (n, f, codecs.encode(bytes(fds[f][:n]), 'hex'))) | |
fds[f] = fds[f][n:] | |
for f in readable: | |
b = bytearray(os.read(f, 1024)) | |
print("Read %d bytes from descriptor %d: %s" % (len(b), f, codecs.encode(bytes(b), 'hex'))) | |
for g in fds.keys(): | |
if not (g == f): | |
fds[g] = fds[g] + b | |
#for action, device in monitor: | |
# handle_udev_event(action, device) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment