Last active
February 11, 2024 14:40
-
-
Save 7bitlyrus/daf0125b5b2e92309defb4f14edb67b0 to your computer and use it in GitHub Desktop.
Python script that interfaces with a voice modem to play arbitrary audio files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import serial, time, tempfile, subprocess, math, sys | |
# https://en.wikipedia.org/wiki/Hayes_command_set | |
# https://en.wikipedia.org/wiki/Voice_modem_command_set | |
class Modem: | |
Serial = None | |
off_hook_status = None | |
def __init__(self, port, baud=112500, timeout=5): | |
self.Serial = serial.Serial(port, baud, timeout) | |
self.send("+++") # Ensure we are in command mode | |
self.send("ATZ") # Send reset signal | |
time.sleep(1) | |
self.off_hook = False | |
def write(self, data): | |
self.Serial.write(data) | |
def send(self, cmd): | |
print(f">{cmd}") | |
self.write(f"{cmd}\r\n".encode()) | |
time.sleep(0.1) | |
def set_off_hook(self, off_hook): | |
self.send("ATH1" if off_hook else "ATH") | |
self.off_hook_status = off_hook | |
def set_vls(self, mode): | |
self.send(f"AT+VLS={mode}") | |
self.off_hook_status = mode != 0 | |
def set_class(self, cls): | |
self.send(f"AT+FCLASS={cls}") | |
def set_vsm(self, method, rate): | |
self.send(f"AT+VSM={method},{rate}") | |
def dial(self, number): | |
self.off_hook_status = True | |
self.send(f"ATD{number}") | |
time.sleep(0.1 * len(number) + 2) | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, tb): | |
if self.off_hook_status: | |
self.set_off_hook(False) | |
class Music: | |
byteData = None | |
bitrate = None | |
buffer_length = None | |
loop = None | |
def __init__(self, filename, format, rate, bitrate, *, frame_length=1, loop=False): | |
self.bitrate = bitrate | |
self.frame_length = frame_length | |
self.loop = loop | |
with tempfile.TemporaryDirectory() as tmp_dir: | |
outfile = f"{tmp_dir}/file.raw" | |
# https://stackoverflow.com/questions/4854513/ | |
command = f"ffmpeg -i {filename} -f {format} -acodec pcm_{format} -ac 1 -ar {rate} {outfile}" | |
subprocess.check_call([x.replace("`", " ") for x in command.split(" ")]) | |
with open(outfile, "rb") as file: | |
self.byteData = bytearray(file.read()) | |
# Send the DLE byte twice to allow it be used as a byte of data | |
self.byteData = self.byteData.replace(b"\x10", b"\x10\x10") | |
# Add a DLE byte to serieses of + characters to prevent accidental escape | |
self.byteData = self.byteData.replace(b"\x2B\x2B", b"\x2B\x10\x00\x2B") | |
def play(self, modem): | |
try: | |
bytes_per_frame = math.ceil((self.bitrate // 8) * self.frame_length) | |
modem.send("AT+VTX") # Start voice transmit | |
while True: | |
print("Playing", end="", flush=True) | |
for i in range(0, len(self.byteData), bytes_per_frame): | |
frame = self.byteData[i : (i + bytes_per_frame)] | |
modem.write(frame) | |
print(".", end="", flush=True) | |
if not self.loop: | |
break | |
print("Done.") | |
modem.write(b"\x10\x21") # Send DTE + ! to indicate end of data | |
except KeyboardInterrupt: | |
modem.write(b"\x10\x18") # Send DTE + CAN to cancel playback | |
modem.write(b"\x10\x21") # Send DTE + ! to indicate end of data | |
print("Aborted.") | |
with Modem("/dev/ttyACM0") as modem: | |
modem.set_class(8) # Set sevice class to voice | |
modem.set_vsm(0, 8000) # Set voice class to Signed PCM, 8-bit, 8000 Hz | |
music = Music(sys.argv[1], "s8", 8000, 8000 * 8, loop=True) | |
# Voice Line Selection to Mode 1, takes off hook (we do this instead of ATH1 to fix an bug) | |
modem.set_vls(1) | |
music.play(modem) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import serial, time, tempfile, subprocess, math, sys | |
# https://en.wikipedia.org/wiki/Hayes_command_set | |
# https://en.wikipedia.org/wiki/Voice_modem_command_set | |
class Modem: | |
Serial = None | |
def __init__(self, port, baud=112500, timeout=5): | |
self.Serial = serial.Serial(port, baud, timeout) | |
self.send("+++") # Ensure we are in command mode | |
self.send("ATZ") # Send reset signal | |
time.sleep(1) | |
self.off_hook = False | |
def write(self, data): | |
self.Serial.write(data) | |
def send(self, cmd): | |
print(f">{cmd}") | |
self.write(f"{cmd}\r\n".encode()) | |
time.sleep(0.1) | |
def set_off_hook(self, off_hook): | |
self.send("ATH1" if off_hook else "ATH") | |
def set_vls(self, mode): | |
self.send(f"AT+VLS={mode}") | |
def set_class(self, cls): | |
self.send(f"AT+FCLASS={cls}") | |
def set_vsm(self, method, rate): | |
self.send(f"AT+VSM={method},{rate}") | |
def dial(self, number): | |
self.send(f"ATD{number}") | |
time.sleep(0.1 * len(number) + 2) | |
class Music: | |
byteData = None | |
bitrate = None | |
buffer_length = None | |
loop = None | |
def __init__(self, filename, format, rate, bitrate, *, frame_length=1, loop=False): | |
self.bitrate = bitrate | |
self.frame_length = frame_length | |
self.loop = loop | |
with tempfile.TemporaryDirectory() as tmp_dir: | |
outfile = f"{tmp_dir}/file.raw" | |
# https://stackoverflow.com/questions/4854513/ | |
command = f"ffmpeg -i {filename} -f {format} -acodec pcm_{format} -ac 1 -ar {rate} -af dynaudnorm -af volume=1.5 {outfile}" | |
subprocess.check_call([x.replace("`", " ") for x in command.split(" ")]) | |
with open(outfile, "rb") as file: | |
self.byteData = bytearray(file.read()) | |
# Send the DLE byte twice to allow it be used as a byte of data | |
self.byteData = self.byteData.replace(b"\x10", b"\x10\x10") | |
# Add a DLE byte to serieses of + characters to prevent accidental escape | |
self.byteData = self.byteData.replace(b"\x2B\x2B", b"\x2B\x10\x00\x2B") | |
def play(self, modem): | |
try: | |
bytes_per_frame = math.ceil((self.bitrate // 8) * self.frame_length) | |
modem.send("AT+VTX") # Start voice transmit | |
while True: | |
print("Playing", end="", flush=True) | |
for i in range(0, len(self.byteData), bytes_per_frame): | |
frame = self.byteData[i : (i + bytes_per_frame)] | |
modem.write(frame) | |
print(".", end="", flush=True) | |
if not self.loop: | |
print("Done.") | |
break | |
modem.write(b"\x10\x21") # Send DTE + ! to indicate end of data | |
except KeyboardInterrupt as e: | |
modem.write(b"\x10\x18") # Send DTE + CAN to cancel playback | |
modem.write(b"\x10\x21") # Send DTE + ! to indicate end of data | |
print("Aborted.") | |
raise e | |
modem = Modem("/dev/ttyACM0") | |
modem.set_class(8) # Set sevice class to voice | |
modem.set_vsm(0, 8000) # Set voice class to Signed PCM, 8-bit, 8000 Hz | |
# http://downloads.asterisk.org/pub/telephony/sounds/releases/asterisk-extra-sounds-en-wav-1.5.tar.gz | |
# http://downloads.asterisk.org/pub/telephony/sounds/releases/asterisk-core-sounds-en-wav-1.6.tar.gz | |
beep = Music("descending-2tone.wav", "s8", 8000, 8000 * 8) | |
connect = Music("privacy-please-stay-on-line-to-be-connected.wav", "s8", 8000, 8000 * 8) | |
music = Music(sys.argv[1], "s8", 8000, 8000 * 8, loop=True) | |
# Voice Line Selection to Mode 1, takes off hook (we do this instead of ATH1 to fix an bug) | |
modem.set_vls(1) | |
try: | |
beep.play(modem) | |
time.sleep(0.25) | |
music.play(modem) | |
except KeyboardInterrupt: | |
time.sleep(0.5) | |
connect.play(modem) | |
modem.dial("!") # Flash Hook | |
time.sleep(0.1) | |
modem.set_off_hook(False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment