Skip to content

Instantly share code, notes, and snippets.

@psobot
Last active November 10, 2021 21:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save psobot/973dacfdc8699eff7d3e496897ba19d8 to your computer and use it in GitHub Desktop.
Save psobot/973dacfdc8699eff7d3e496897ba19d8 to your computer and use it in GitHub Desktop.
MIDI Sample Dump Receiver
"""
MIDI Sample Dump receive script
by Peter Sobot, Nov 6, 2021
@psobot / github@petersobot.com
Requirements: pip install python-rtmidi tqdm numpy pysoundfile
"""
import argparse
import rtmidi
from tqdm import tqdm
import numpy as np
import soundfile as sf
from typing import Tuple
HEADER_TAG = 0x01
PACKET_TAG = 0x02
def get_midi_device_named(midi_identifier: str) -> Tuple[rtmidi.MidiIn, rtmidi.MidiOut]:
midi_out = rtmidi.MidiOut()
midi_in = rtmidi.MidiIn(queue_size_limit=8192)
midi_in.ignore_types(sysex=False)
in_port_names = set(midi_in.get_ports())
out_port_names = set(midi_out.get_ports())
bidirectional_port_names = in_port_names & out_port_names
if not bidirectional_port_names:
raise ValueError("No bidirectional MIDI interface found. Is one connected?")
if midi_identifier:
matching_port_names = [
port_name
for port_name in bidirectional_port_names
if midi_identifier.lower() in port_name.lower()
]
if not matching_port_names:
raise ValueError(
f"No bidirectional MIDI interface containing {midi_identifier}"
f" found (options: {bidirectional_port_names})."
)
port_name = matching_port_names[0]
else:
port_name = bidirectional_port_names.pop()
return midi_in.get_ports().index(port_name), midi_out.get_ports().index(port_name)
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i : i + n]
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--interface-name", help="The name of the MIDI interface to use for dumping.", default="Uno"
)
parser.add_argument("--output-file", required=True, help="The output file to dump WAV data to.")
args = parser.parse_args()
if not args.output_file.endswith(".wav"):
raise ValueError("Output filename must end in '.wav'!")
midi_in_port, midi_out_port = get_midi_device_named(args.interface_name)
midiout = rtmidi.MidiOut()
midiout.open_port(midi_out_port)
midiin = rtmidi.MidiIn()
midiin.open_port(midi_in_port)
midiin.ignore_types(sysex=False)
print(f"Waiting to receive dump (initiate on device side)...")
last_packet_received = -1
total_received = 0
pbar = None
output_file = None
try:
while True:
message = midiin.get_message()
if message:
message_bytes, time = message
message_bytes = bytes(message_bytes)
# Check for the Sysex Sample Dump header:
if message_bytes[0:2] != b'\xf0\x7e':
continue
if message_bytes[3] == HEADER_TAG:
sample_resolution = message_bytes[6]
if sample_resolution != 16:
raise NotImplementedError(
"Support for bit depths other than 16-bit is not yet implemented."
)
sample_period_ns = (
message_bytes[7] | (message_bytes[8] << 7) | (message_bytes[9] << 14)
)
sample_rate_hz = int(1_000_000_000 / sample_period_ns)
print(
f"Length in words bytes: {message_bytes[10]} {message_bytes[11]}"
f" {message_bytes[12]}"
)
length_in_words = (
message_bytes[10] | (message_bytes[11] << 7) | (message_bytes[12] << 14)
)
if length_in_words == 1:
# Could be roll over!
length_in_words = 2097152
print(
f"Receiving a dump of {length_in_words:,} samples at {sample_rate_hz:,}Hz"
f" {sample_resolution}-bit."
)
output_file = sf.SoundFile(
args.output_file,
'w',
samplerate=sample_rate_hz,
channels=1,
subtype='PCM_16'
)
pbar = tqdm(total=length_in_words)
elif message_bytes[3] == PACKET_TAG:
packet_number = message_bytes[4]
if packet_number != last_packet_received + 1:
print(
f"Warning: dropped packet! Received #{packet_number}, expected"
f" #{last_packet_received:,}."
)
continue
data = message_bytes[5:125]
# Decode data from 3-byte format:
for a, b, c in chunks(data, 3):
sample = (a << 9 | b << 2 | c >> 5) - 32768
output_file.write(np.array([sample], dtype=np.int16))
total_received += 1
pbar.update()
output_file.flush()
# TODO: checksum! Second-last byte contains:
# This is the XOR of the bytes 0x7E, cc, 0x02, kk, and all
# 120 bytes of waveform data (with bit 7 of result masked off to 0).
if last_packet_received == 126:
last_packet_received = -1
else:
last_packet_received += 1
except KeyboardInterrupt:
pass
if pbar:
pbar.close()
if output_file:
output_file.close()
print(f"Done! Received {total_received:,} samples of data.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment