Skip to content

Instantly share code, notes, and snippets.

@mentha
Created May 3, 2022 13:36
Show Gist options
  • Save mentha/c368bb4d1c1a969765775ba439f3239e to your computer and use it in GitHub Desktop.
Save mentha/c368bb4d1c1a969765775ba439f3239e to your computer and use it in GitHub Desktop.
assign a usb port to libvirt guest
#!/usr/bin/env python3
from argparse import ArgumentParser
from traceback import print_exc
import libvirt
import os
import re
import subprocess as sp
class PortForward:
USBSYS = '/sys/bus/usb/devices'
def parse_args(self):
a = ArgumentParser(description='attach usb port to guests')
a.add_argument('guest', help='guest name')
a.add_argument('port', nargs='+', help='usb port in format of "BUS-PORT[.PORT]*"')
a = a.parse_args()
self.guest_name = a.guest
self.ports = set(a.port)
def main(self):
self.parse_args()
self.conn = libvirt.open()
self.guest = self.conn.lookupByName(self.guest_name)
mon = sp.Popen(['udevadm', 'monitor', '-s', 'usb', '-k'], stdin=sp.DEVNULL, stdout=sp.PIPE, universal_newlines=True)
re_devm = re.compile(r'\s+(add|remove)\s+\S+/(\d+-\d+(\.\d+)*)\s')
try:
while True:
l = mon.stdout.readline()
if not l:
break
m = re_devm.search(l)
if m is None:
continue
print(l.strip())
self.devchange(m[1], m[2])
finally:
mon.terminate()
def readall(self, *a, **ka):
with open(*a, **ka) as f:
return f.read()
def usbprop(self, dev, prop, *a, **ka):
return self.readall(os.path.join(self.USBSYS, dev, prop), *a, **ka).strip()
def devchange(self, action, path):
try:
if int(self.usbprop(path, 'bDeviceClass'), 16) == 0x09:
return # is a hub
match = False
for p in self.ports:
if path == p or path.startswith(p + '.'):
match = True
break
if not match:
return
if action == 'add':
self.devadd(path)
elif action == 'remove':
self.devdel(path)
else:
print(f'unhandled action {action}')
except FileNotFoundError:
pass
except:
print_exc()
def devxml(self, path):
bus, port = path.split('-', 1)
return ('<hostdev mode="subsystem" type="usb" managed="yes">'
'<source>'
f'<vendor id="0x{self.usbprop(path, "idVendor")}"/>'
f'<product id="0x{self.usbprop(path, "idProduct")}"/>'
#f'<address type="usb" bus="0x{bus}" port="{port}"/>'
'</source>'
'</hostdev>')
def devadd(self, path):
print(f'attaching {path}')
self.guest.attachDevice(self.devxml(path))
def devdel(self, path):
print(f'detaching {path}')
self.guest.detachDevice(self.devxml(path))
if __name__ == '__main__':
PortForward().main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment