Skip to content

Instantly share code, notes, and snippets.

@SeanPesce
Last active June 17, 2022 16:28
Show Gist options
  • Save SeanPesce/d2d788ca426f24d2c88848f32cd4a21c to your computer and use it in GitHub Desktop.
Save SeanPesce/d2d788ca426f24d2c88848f32cd4a21c to your computer and use it in GitHub Desktop.
Python 3 classes for USB bulk device I/O
#!/usr/bin/env python3
# Author: Sean Pesce
# Installing prerequisites:
# sudo pip3 install pyusb
#
# On Windows, you also need to install libusb:
# https://sourceforge.net/projects/libusb-win32/files/libusb-win32-releases/
# Then, use inf-wizard.exe to create and install a libusb driver for the device.
# Note: this requires installation of an unsigned driver.
# On Linux, this script must be run as root.
# References:
# https://github.com/pyusb/pyusb/blob/master/docs/tutorial.rst
import usb.core
import usb.util
class USB_Device:
def __init__(self, vendor_id, product_id, name='USB Device', config_id=None):
self.vendor_id = vendor_id
self.product_id = product_id
self.product_name = name
self.dev = usb.core.find(idVendor=vendor_id, idProduct=product_id)
assert self.dev is not None, f'Failed to find {self.product_name}'
self.dev.reset()
# Set the active configuration. With no arguments, the first configuration will be the active one
if config_id is None:
self.dev.set_configuration()
else:
self.dev.set_configuration(config_id)
self.cfg = self.dev.get_active_configuration()
self.iface = self.cfg[(0,0)]
def reset(self):
return self.dev.reset()
class USB_Bulk_Device(USB_Device):
def __init__(self, vendor_id, product_id, name='USB Bulk Device', config_id=None):
super().__init__(vendor_id, product_id, name, config_id)
self.in_endp = usb.util.find_descriptor(
self.iface,
# Match the first IN endpoint
custom_match = lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN)
assert self.in_endp is not None, 'Failed to find IN endpoint'
self.out_endp = usb.util.find_descriptor(
self.iface,
# Match the first OUT endpoint
custom_match = lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT)
assert self.out_endp is not None, 'Failed to find OUT endpoint'
def clear_halt(self):
self.dev.clear_halt(self.out_endp)
self.dev.clear_halt(self.in_endp)
def write(self, data, encoding='utf8'):
if type(data) == str:
data = data.encode(encoding)
data = bytearray(data)
written = 0 # Total bytes written
wrote_once = False # Flag to allow writing an empty message
while data or not wrote_once:
nwrite = self.dev.write(self.out_endp.bEndpointAddress, data[:self.out_endp.wMaxPacketSize])
data = data[nwrite:]
wrote_once = True
written += nwrite
return written
def read(self, size, timeout=None):
"""
Attempts to read the specified number of bytes from the USB device. This function does NOT
guarantee that the specified number of bytes will be returned. Multiple reads are performed
ONLY IF the read size is greater than the maximum packet size of the endpoint.
"""
data = bytearray()
read_more = True
while read_more:
remaining = size - len(data)
if remaining > self.in_endp.wMaxPacketSize:
read_more = True
else:
read_more = False
read_data = self.dev.read(self.in_endp.bEndpointAddress, min(self.in_endp.wMaxPacketSize, remaining), timeout)
data += read_data
return data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment