Skip to content

Instantly share code, notes, and snippets.

@un1tz3r0
Created August 23, 2017 19:16
Show Gist options
  • Save un1tz3r0/ab896e787b95293af08d3a2b3b715583 to your computer and use it in GitHub Desktop.
Save un1tz3r0/ab896e787b95293af08d3a2b3b715583 to your computer and use it in GitHub Desktop.
python midi repeater with dynamic usb via udev
#!/usr/bin/python
import pyudev
import select
import functools
import re
import os
import codecs
fds = {}
devs = {}
def handle_udev_event(action, device):
global fds
global devs
if not device.device_node:
return
if not re.match(u"^/dev/midi[0-9]+$", device.device_node):
return
node = device.device_node
#print('ACTION={0}: DEVICE={1} SUBSYSTEM={2} TAGS={3}'.format(action, device, device.subsystem, list(device.tags)))
if action == "remove":
if node in devs.keys():
if devs[node] in fds.keys():
del fds[devs[node]]
os.close(devs[node])
print("removed "+node+" closed fd "+str(devs[node]))
del devs[node]
elif action == "add":
if node not in devs.keys():
fd = os.open(node, os.O_RDWR)
devs[node] = fd
fds[fd] = bytearray(0)
print("added "+node+" opened fd "+str(fd))
context = pyudev.Context()
for device in iter(context.list_devices(subsystem='sound')):
handle_udev_event('add', device)
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem='sound')
monitor.enable_receiving()
while True:
reading = [monitor.fileno()] + list(fds.keys())
writing = [k for k in fds.keys() if len(fds[k]) > 0]
trying = reading
readable, writable, exceptional = select.select(reading, writing, trying)
if monitor.fileno() in readable:
readable.remove(monitor.fileno())
for udev in iter(functools.partial(monitor.poll, 0), None):
handle_udev_event(udev.action, udev)
for f in writable:
n = os.write(f, fds[f])
print("Wrote %d bytes to descriptor %d: %s" % (n, f, codecs.encode(bytes(fds[f][:n]), 'hex')))
fds[f] = fds[f][n:]
for f in readable:
b = bytearray(os.read(f, 1024))
print("Read %d bytes from descriptor %d: %s" % (len(b), f, codecs.encode(bytes(b), 'hex')))
for g in fds.keys():
if not (g == f):
fds[g] = fds[g] + b
#for action, device in monitor:
# handle_udev_event(action, device)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment