Skip to content

Instantly share code, notes, and snippets.

@todbot
Last active May 8, 2024 23:14
Show Gist options
  • Save todbot/9f056cc8419e36a380c62fcdd6204d22 to your computer and use it in GitHub Desktop.
Save todbot/9f056cc8419e36a380c62fcdd6204d22 to your computer and use it in GitHub Desktop.
Demonstrate trying to use the USBError of usb.core.Device.read()
# Demonstrate trying to use the USBError of usb.core.Device.read() as a
# "no bytes available" indicator. Unfortuately, it's VERY slow.
# Or more likely, it's missing messages mid-parse.
# 8 May 2024 - @todbot
#
# note, this requires two special libraries:
# - adafruit_usb_host_midi_hostreadhack
# - winterbloom_smolmidi_hostreadhack
import time
import board
import usb
import max3421e
import usb_midi
import adafruit_usb_host_midi_hostreadhack as adafruit_usb_host_midi
import winterbloom_smolmidi_hostreadhack as smolmidi
import select
midi_host_timeout_ms = 2
spi = board.SPI()
cs = board.D10
irq = board.D9
host_chip = max3421e.Max3421E(spi, chip_select=cs, irq=irq)
def look_for_midi_usb_device():
"""Look for MIDI device on USB Host port, returns None if no device"""
midi_usb_device = None
for device in usb.core.find(find_all=True):
try:
midi_usb_device = adafruit_usb_host_midi.MIDI(device)
print("Found vid/pid %04x/%04x" % (device.idVendor, device.idProduct),
device.manufacturer, device.product)
except ValueError:
print("bad device:", device)
return midi_usb_device
midi_usb_device = None
while midi_usb_device is None:
print("looking for device")
midi_usb_device = look_for_midi_usb_device()
time.sleep(0.2)
print(type(midi_usb_device), type(midi_usb_device.device))
print("Found MIDI device: ", midi_usb_device)
midi_device = smolmidi.MidiIn(midi_usb_device, timeout_ms=midi_host_timeout_ms)
last_ui_time = 0
last_msg = None
while True:
if time.monotonic() - last_ui_time > 0.3:
last_ui_time = time.monotonic()
print("UI goes here:", time.monotonic(), last_msg)
try:
msg = midi_device.receive()
if msg:
msg_bytes = bytes(msg)
print("midi msg:", ["%2x" % v for v in msg_bytes])
last_msg = msg
except usb.core.USBError as e:
#print(e) # nothing to see, so printing doesn't matter
pass
# SPDX-FileCopyrightText: Copyright (c) 2023 Scott Shawcroft for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`adafruit_usb_host_midi`
================================================================================
CircuitPython USB host driver for MIDI devices
* Author(s): Scott Shawcroft
"""
import adafruit_usb_host_descriptors
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_USB_Host_MIDI.git"
DIR_IN = 0x80
class MIDI:
def __init__(self, device):
self.interface_number = 0
self.in_ep = 0
self.out_ep = 0
self.device = device
self.buf = bytearray(64)
self.start = 0
self._remaining = 0
config_descriptor = adafruit_usb_host_descriptors.get_configuration_descriptor(
device, 0
)
i = 0
midi_interface = False
while i < len(config_descriptor):
descriptor_len = config_descriptor[i]
descriptor_type = config_descriptor[i + 1]
if descriptor_type == adafruit_usb_host_descriptors.DESC_CONFIGURATION:
config_value = config_descriptor[i + 5]
elif descriptor_type == adafruit_usb_host_descriptors.DESC_INTERFACE:
interface_number = config_descriptor[i + 2]
interface_class = config_descriptor[i + 5]
interface_subclass = config_descriptor[i + 6]
midi_interface = interface_class == 0x1 and interface_subclass == 0x3
if midi_interface:
self.interface_number= interface_number
elif descriptor_type == adafruit_usb_host_descriptors.DESC_ENDPOINT:
endpoint_address = config_descriptor[i + 2]
if endpoint_address & DIR_IN:
if midi_interface:
self.in_ep = endpoint_address
else:
if midi_interface:
self.out_ep = endpoint_address
i += descriptor_len
device.set_configuration()
device.detach_kernel_driver(self.interface_number)
def read(self, size, timeout_ms=0):
"""
Do a devie read.
Supports non-standard timeout_ms param.
Danger, if timeout_ms is used, this method will throw
an unlabled usb.core.USBError if no bytes available.
Note: usb.core.Device says timeout param is int|None, but really it's int always
"""
if self._remaining == 0:
self._remaining = self.device.read(self.in_ep, self.buf, timeout_ms) - 1
self.start = 1
size = min(size, self._remaining)
b = self.buf[self.start:self.start + size]
self.start += size
self._remaining -= size
return b
def readinto(self, buf, timeout_ms=0):
""" Do a readinto a buf. Supports non-standard timeout_ms param """
b = self.read(len(buf), timeout_ms)
n = len(b)
if n:
buf[:] = b
return n
def __repr__(self):
# also idProduct/idVendor for vid/pid
return "MIDI Device " + str(self.device.manufacturer) + "/" + str(self.device.product)
# The MIT License (MIT)
#
# Copyright (c) 2019 Alethea Flowers for Winterbloom
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/theacodes/Winterbloom_SmolMIDI.git"
"""A minimalist MIDI library."""
# Message type constants.
NOTE_OFF = 0x80
NOTE_ON = 0x90
AFTERTOUCH = 0xA0
CONTROLLER_CHANGE = CC = 0xB0
PROGRAM_CHANGE = 0xC0
CHANNEL_PRESSURE = 0xD0
PITCH_BEND = 0xE0
SYSTEM_EXCLUSIVE = SYSEX = 0xF0
SONG_POSITION = 0xF2
SONG_SELECT = 0xF3
BUS_SELECT = 0xF5
TUNE_REQUEST = 0xF6
SYSEX_END = 0xF7
CLOCK = 0xF8
TICK = 0xF9
START = 0xFA
CONTINUE = 0xFB
STOP = 0xFC
ACTIVE_SENSING = 0xFE
SYSTEM_RESET = 0xFF
_LEN_0_MESSAGES = set(
[
TUNE_REQUEST,
SYSEX,
SYSEX_END,
CLOCK,
TICK,
START,
CONTINUE,
STOP,
ACTIVE_SENSING,
SYSTEM_RESET,
]
)
_LEN_1_MESSAGES = set([PROGRAM_CHANGE, CHANNEL_PRESSURE, SONG_SELECT, BUS_SELECT])
_LEN_2_MESSAGES = set([NOTE_OFF, NOTE_ON, AFTERTOUCH, CC, PITCH_BEND, SONG_POSITION])
def _is_channel_message(status_byte):
return status_byte >= NOTE_OFF and status_byte <= PITCH_BEND + 0x0F
def _read_n_bytes(port, buf, dest, num_bytes, timeout_ms):
while num_bytes:
if port.readinto(buf, timeout_ms):
dest.append(buf[0])
num_bytes -= 1
class Message:
def __init__(self):
self.type = None
self.channel = None
self.data = None
def __bytes__(self):
status_byte = self.type
if self.channel:
status_byte |= self.channel
return bytes([status_byte] + list(self.data if self.data else []))
class MidiIn:
def __init__(self, port, timeout_ms=0, enable_running_status=False):
self._port = port
self.timeout_ms = timeout_ms
self._read_buf = bytearray(1)
self._running_status_enabled = enable_running_status
self._running_status = None
self._outstanding_sysex = False
self._error_count = 0
@property
def error_count(self):
return self._error_count
def receive(self):
# Before we do anything, check and see if there's an unprocessed
# sysex message pending. If so, throw it away. The caller has
# to call receive_sysex if they care about the bytes.
if self._outstanding_sysex:
self.receive_sysex(0)
# Read the status byte for the next message.
result = self._port.readinto(self._read_buf, self.timeout_ms)
# No message ready.
if not result:
return None
message = Message()
data_bytes = bytearray()
# Is this a status byte?
status_byte = self._read_buf[0]
is_status = status_byte & 0x80
# If not, see if we have a running status byte.
if not is_status:
if self._running_status_enabled and self._running_status:
data_bytes = [status_byte]
status_byte = self._running_status
# If not a status byte and no running status, this is
# invalid data.
else:
self._error_count += 1
return None
# Is this a channel message, if so, let's figure out the right
# message type and set the message's channel property.
if _is_channel_message(status_byte):
# Only set the running status byte for channel messages.
self._running_status = status_byte
# Mask off the channel nibble.
message.type = status_byte & 0xF0
message.channel = status_byte & 0x0F
else:
message.type = status_byte
# Read the appropriate number of bytes for each message type.
if message.type in _LEN_2_MESSAGES:
_read_n_bytes(self._port, self._read_buf, data_bytes, 2 - len(data_bytes), self.timeout_ms)
message.data = data_bytes
elif message.type in _LEN_1_MESSAGES:
_read_n_bytes(self._port, self._read_buf, data_bytes, 1 - len(data_bytes), self.timeout_ms)
message.data = data_bytes
# If this is a sysex message, set the pending sysex flag so we
# can throw the message away if the user doesn't process it.
if message.type == SYSEX:
self._outstanding_sysex = True
# Check the data bytes for corruption. If the data bytes have any status bytes
# embedded, it probably means the buffer overflowed. Either way, discard the
# message.
# TODO: Figure out a better way to detect and deal with this upstream.
for b in data_bytes:
if b & 0x80:
self._error_count += 1
return None
return message
def receive_sysex(self, max_length):
"""Receives the next outstanding sysex message.
Returns a tuple: the first item is the bytearray of the
sysex message. The second is a boolean that indicates
if the message was truncated or not.
This must only be called after getting a sysex message from
receive and must be called before invoking receive again.
"""
self._outstanding_sysex = False
out = bytearray()
length = 0
buf = bytearray(1)
truncated = False
# This reads one byte at a time so we don't read past the
# end byte. There may be more efficient ways to do this
# but sysex messages should be relatively rare in practice,
# so I'm not sure how much benefit we'll get.
while length < max_length:
self._port.readinto(buf)
if buf[0] == SYSEX_END:
break
out.extend(buf)
length += 1
# We exceeded the length.
else:
truncated = True
# Ignore the rest of the message by reading and throwing away
# bytes until we get to SYSEX_END.
while buf[0] != SYSEX_END:
self._port.readinto(buf)
return out, truncated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment