Created
July 11, 2020 10:39
-
-
Save SebbiUltimate/bcf61efb9be5bbfc5161097e0d998c9a to your computer and use it in GitHub Desktop.
Unraid USB Device Auto Attach Tool - Create usbdevices.cfg in the same folder containing the device names one per line
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Unraid USB Device Auto Attach Tool, Made By SebbiUltimate 2020 | |
import sys | |
import os | |
import re | |
import subprocess | |
import xml.etree.ElementTree as ET | |
def getDeviceCfg(): | |
with open(os.path.abspath(os.path.dirname(sys.argv[0]))+'/usbdevices.cfg') as f: | |
cfg=[] | |
for line in f: | |
cfg.append([elt.strip() for elt in line.split(';')]) | |
return cfg | |
def getVMDevices(vmname): | |
xmlfile="/tmp/vmdevices_"+vmname+".xml" | |
os.system("virsh dumpxml "+vmname+" > "+xmlfile) | |
VXMLP = VirshXMLParser(xmlfile) | |
usbdevs = VXMLP.get_elements(path='./devices/', tag='hostdev', attrib='type', value='usb', listout=True) | |
DevVXMLP = VirshXMLParser() | |
vmdevs=[] | |
for usbdev in usbdevs: | |
DevVXMLP.root = usbdev | |
address = DevVXMLP.get_elements(path='./source/', tag='address') | |
vmdev={'bus': address.get('bus'), 'device': address.get('device')} | |
vmdevs.append(vmdev) | |
return vmdevs | |
def getHostDevices(): | |
device_re = re.compile("Bus\s+(?P<bus>\d+)\s+Device\s+(?P<device>\d+).+ID\s(?P<id>\w+:\w+)\s(?P<tag>.+)$", re.I) | |
df = subprocess.check_output("lsusb") | |
devices = [] | |
for i in df.split('\n'): | |
if i: | |
info = device_re.match(i) | |
if info: | |
dinfo = info.groupdict() | |
#dinfo['device'] = '/dev/bus/usb/%s/%s' % (dinfo.pop('bus'), dinfo.pop('device')) | |
dinfo['bus']=str(int(dinfo['bus'])) | |
dinfo['device']=str(int(dinfo['device'])) | |
dinfo['tag']=dinfo['tag'].strip() | |
devices.append(dinfo) | |
return devices | |
def getRunningVMs(): | |
vl = subprocess.check_output(["virsh", "list"]) | |
vms=[] | |
for line in vl.split('\n')[2:]: | |
if line: | |
vms.append(line.split()[1]) | |
return vms | |
def writeDevXML(bus,device): | |
devxml="<hostdev mode='subsystem' type='usb'><source><address bus='"+bus+"' device='"+device+"'/></source></hostdev>" | |
path="/tmp/usbdev_"+bus+"-"+device+".xml" | |
f = open(path, "w") | |
f.write(devxml) | |
f.close() | |
return path | |
def attachDevice(VM,bus,device): | |
path=writeDevXML(bus,device) | |
print(subprocess.check_output(["virsh", "attach-device",VM,path])) | |
def detachDevice(VM,bus,device): | |
path=writeDevXML(bus,device) | |
print(subprocess.check_output(["virsh", "detach-device",VM,path])) | |
class VirshXMLParser: | |
def __init__(self, xmlfile=None): | |
self.root = None | |
if xmlfile: | |
tree = ET.parse(xmlfile) | |
self.root = tree.getroot() | |
def get_attrib(self, attr, **args): | |
element = self.get_elements(**args) | |
if element is None: | |
return 'None' | |
else: | |
return element.get(attr) | |
def get_elements( | |
self, | |
path=None, | |
tag=None, | |
attrib=None, | |
value=None, | |
ns={}, | |
listout=False): | |
elements = [self.root] | |
search = '.' if not path else path | |
if tag: | |
search = search + tag | |
if attrib and value: | |
search = "%s[@%s='%s']" % (search, attrib, value) | |
elements = self.root.findall(search, ns) | |
if listout: | |
return elements | |
if elements: | |
return elements[0] | |
return None | |
def get_text(self, **args): | |
element = self.get_elements(**args) | |
if element is None: | |
return 'None' | |
else: | |
return element.text | |
print("**Unraid USB Device Auto Attach Tool**") | |
hostdevs=getHostDevices() | |
attachList=[] | |
detachList=[] | |
i=0 | |
for VM in getRunningVMs(): | |
print("Processing VM "+VM) | |
vmdevlist=[] | |
for dns in getDeviceCfg(): | |
for dn in dns: | |
devnum=next((i for i, item in enumerate(hostdevs) if item["tag"] == dn), None) | |
if devnum: | |
dev=hostdevs.pop(devnum) | |
vmdevlist.append(dev) | |
break | |
vmdevices=getVMDevices(VM) | |
shoulddevices=vmdevlist | |
for d in shoulddevices: | |
del d['tag'] | |
del d['id'] | |
if (cmp(vmdevices,shoulddevices)): | |
print("Devices of "+VM+" are not matching the configured Device List, reattaching...") | |
for dev in vmdevices: | |
print("Adding USB device "+dev['bus']+"-"+dev['device']+" from "+VM+" to detach list.") | |
dev['VM']=VM | |
detachList.append(dev) | |
for dev in shoulddevices: | |
print("Adding USB device "+dev['bus']+"-"+dev['device']+" from "+VM+" to attach list.") | |
dev['VM']=VM | |
attachList.append(dev) | |
else: | |
print("Devices of "+VM+" are OK") | |
i+=1 | |
for dev in detachList: | |
print("Detaching USB device "+dev['bus']+"-"+dev['device']+" from "+VM) | |
detachDevice(dev['VM'],dev['bus'],dev['device']) | |
for dev in attachList: | |
print("Attaching USB device "+dev['bus']+"-"+dev['device']+" to "+VM) | |
attachDevice(dev['VM'],dev['bus'],dev['device']) | |
print("Finished.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment