Skip to content

Instantly share code, notes, and snippets.

@aynik
Last active February 12, 2024 13:45
Show Gist options
  • Save aynik/d59eaef65568fb72ea9ff677b1c348e3 to your computer and use it in GitHub Desktop.
Save aynik/d59eaef65568fb72ea9ff677b1c348e3 to your computer and use it in GitHub Desktop.
QAM/OFDM Audio Modem
#!/usr/bin/env python3
import os
import sys
import types
import struct
import itertools
import functools
import collections
try:
import numpy as np
import reedsolo
except ModuleNotFoundError as error:
print(f"Error: Required module '{error.name}' not found", file=sys.stderr)
exit(1)
def make_conf(**kwargs):
conf = types.SimpleNamespace()
conf.sampling_frequency = 16e3
conf.num_points = 256
conf.frequency_spacing = 1e3
conf.max_frequency = 6e3
conf.gain = 1.0
conf.bits_per_sample = 16
conf.block_size = 255
conf.ecc_symbols = 7
conf.coherence_threshold = 0.9
conf.equalizer_length = 100
conf.silence_length = 10
conf.silence_start = 0.1
conf.silence_stop = 0.1
conf.skip_start = 0.1
conf.timeout = 60.0
conf.eof_block = b""
conf.prefix_fmt = ">B"
conf.__dict__.update(kwargs)
configure_prefix(conf)
configure_bitrate(conf)
configure_bit_packer(conf)
configure_carriers(conf)
configure_omegas(conf)
configure_symbols(conf)
configure_modem(conf)
configure_interpolator(conf)
configure_ecc(conf)
configure_info(conf)
return conf
def configure_prefix(conf):
assert conf.equalizer_length is not None, "equalizer_length must be set"
assert conf.silence_length is not None, "silence_length must be set"
conf.prefix_bytes = [1] * conf.equalizer_length + [0] * conf.silence_length
conf.carrier_duration = sum(conf.prefix_bytes)
conf.carrier_threshold = int(conf.coherence_threshold * conf.carrier_duration)
conf.search_window = int(0.1 * conf.carrier_duration)
conf.start_pattern_length = conf.search_window // 4
def configure_bitrate(conf):
assert conf.sampling_frequency is not None, "sampling_frequency must be set"
assert conf.num_points is not None, "num_points must be set"
assert conf.max_frequency is not None, "max_frequency must be set"
assert conf.frequency_spacing is not None, "frequency_spacing must be set"
conf.frequencies = np.arange(
conf.frequency_spacing,
conf.max_frequency + conf.frequency_spacing,
conf.frequency_spacing
)
conf.frequency_scaling = conf.sampling_frequency
conf.bits_per_symbol = int(np.log2(conf.num_points))
conf.bits_per_baud = conf.bits_per_symbol * len(conf.frequencies)
conf.signaling_interval = 1.0 / conf.sampling_frequency
conf.symbol_duration = 1 / conf.frequency_spacing
conf.symbol_rate = 1 / conf.symbol_duration
conf.num_symbols = int(conf.symbol_duration / conf.signaling_interval)
conf.baud_rate = int(conf.symbol_rate)
conf.modem_bps = conf.baud_rate * conf.bits_per_baud
def configure_bit_packer(conf):
bits = [tuple((1 if b else 0) for b in [index & (2**k) for k in range(8)]) for index in range(2**8)]
conf.to_bits = dict((i, bit) for i, bit in enumerate(bits))
conf.to_byte = dict((bit, i) for i, bit in enumerate(bits))
def configure_carriers(conf):
assert conf.num_symbols is not None, "num_symbols must be set"
assert conf.signaling_interval is not None, "signaling_interval must be set"
assert conf.frequencies is not None, "frequencies must be set"
conf.carrier_frequency = conf.frequencies[0]
conf.carriers = np.array([np.exp(2j * np.pi * f * np.arange(0, conf.num_symbols) * conf.signaling_interval) for f in conf.frequencies])
def configure_omegas(conf):
assert conf.frequencies is not None, "frequencies must be set"
assert conf.sampling_frequency is not None, "sampling_frequency must be set"
assert conf.carrier_frequency is not None, "carrier_frequency must be set"
conf.omegas = 2 * np.pi * np.array(conf.frequencies) / conf.sampling_frequency
conf.index_omega = 2 * np.pi * conf.carrier_frequency / conf.sampling_frequency
def configure_symbols(conf):
assert conf.bits_per_symbol is not None, "bits_per_symbol must be set"
assert conf.num_points is not None, "num_points must be set"
symbols = int(np.ceil(conf.bits_per_symbol // 2))
symbols = np.array([complex(x, y) for x in range(2**symbols) for y in range(conf.num_points // 2**symbols)])
symbols = symbols - symbols[-1] / 2
conf.symbols = symbols / np.max(np.abs(symbols))
def configure_modem(conf):
assert conf.bits_per_symbol is not None, "bits_per_symbol must be set"
assert conf.symbols is not None, "symbols must be set"
conf.encode_map = {tuple([int(i & (1 << j) != 0) for j in range(conf.bits_per_symbol)]): v for i, v in enumerate(conf.symbols)}
bits_map = { v: k for k, v in conf.encode_map.items()}
conf.decode_list = [(s, bits_map[s]) for s in conf.symbols]
def configure_interpolator(conf):
conf.interpolator_width = 128
conf.interpolator_resolution = 1024
N = conf.interpolator_resolution * conf.interpolator_width
u = np.arange(-N, N, dtype=float)
window = np.cos(0.5 * np.pi * u / N) ** 2.0
h = np.sinc(u / conf.interpolator_resolution) * window
conf.interpolator_filters = []
for index in range(conf.interpolator_resolution):
interpolator_filter = h[index :: conf.interpolator_resolution]
interpolator_filter = interpolator_filter[::-1]
conf.interpolator_filters.append(interpolator_filter)
conf.interpolator_coeff_len = 2 * conf.interpolator_width
def configure_ecc(conf):
assert conf.block_size is not None, "block_size must be set"
assert conf.ecc_symbols is not None, "ecc_symbols must be set"
assert conf.prefix_fmt is not None, "prefix_fmt must be set"
conf.frame_size = conf.block_size - conf.ecc_symbols
conf.payload_size = conf.frame_size - struct.calcsize(conf.prefix_fmt)
conf.ecc = reedsolo.RSCodec(conf.ecc_symbols, conf.block_size)
def configure_info(conf):
conf.info = "{0:.1f} kb/s ({1:d}-QAM x {2:d} carriers) Fs={3:.1f} kHz".format(
conf.modem_bps / 1e3,
len(conf.symbols),
len(conf.frequencies),
conf.sampling_frequency / 1e3,
)
def chain_wrapper(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
result = func(*args, **kwargs)
return itertools.chain.from_iterable(result)
return wrapped
@chain_wrapper
def to_bytes(conf, bits):
for chunk in iterate(bits, 8, dtype=tuple, truncate=True):
yield [conf.to_byte[chunk]]
@chain_wrapper
def encode_payload(conf, data):
for frame in encode_frame(conf, data):
for byte in frame:
yield conf.to_bits[byte]
def take(iterable, n):
return np.array(list(itertools.islice(iterable, n)))
def icapture(iterable, result):
for i in iter(iterable):
result.append(i)
yield i
def split(iterable, n):
def _gen(it, index):
for item in it:
yield item[index]
iterables = itertools.tee(iterable, n)
return [_gen(it, index) for index, it in enumerate(iterables)]
def iterate_index(data, size, dtype=None, truncate=True):
offset = 0
data = iter(data)
while True:
buffer = list(itertools.islice(data, size))
if len(buffer) < size:
if truncate or not buffer:
return
result = dtype(buffer) if dtype else np.array(buffer)
yield (offset, result)
offset += size
def iterate(data, size, dtype=None, truncate=True):
yield from (result for _, result in iterate_index(data, size, dtype, truncate))
def prbs(reg, poly, bits):
while True:
mask = (1 << bits) - 1
size = 0
while (poly >> size) > 1:
size += 1
while True:
yield reg & mask
reg = reg << 1
if reg >> size:
reg = reg ^ poly
def reader(fd, size=(8 << 10), dtype=None):
while True:
data = fd.read(size)
if not data:
break
yield dtype(data) if dtype else data
def solver(t, y):
N = len(t)
t0 = np.array([1.0 / t[0]])
f = [t0]
b = [t0]
for n in range(1, N):
prev_f = f[-1]
prev_b = b[-1]
ef = sum(t[n - i] * prev_f[i] for i in range(n))
eb = sum(t[i + 1] * prev_b[i] for i in range(n))
f_ = np.concatenate([prev_f, np.array([0])])
b_ = np.concatenate([np.array([0]), prev_b])
det = 1.0 - ef * eb
f.append((f_ - ef * b_) / det)
b.append((b_ - eb * f_) / det)
x = []
for n in range(N):
x = np.concatenate([x, np.array([0])])
ef = sum(t[n - i] * x[i] for i in range(n))
x = x + (y[n] - ef) * b[n]
return x
def exp_iwt(omega, n):
return np.exp(1j * omega * np.arange(n))
def norm(x):
return np.sqrt(np.dot(x.conj(), x).real)
def rms(x):
return np.mean(np.abs(x) ** 2, axis=0) ** 0.5
def coherence(x, omega):
n = len(x)
Hc = exp_iwt(-omega, n) / np.sqrt(0.5 * n)
norm_x = norm(x)
if not norm_x:
return 0.0
return np.dot(Hc, x) / norm_x
def linear_regression(x, y):
x = np.array(x)
y = np.array(y)
mean_x = np.mean(x)
mean_y = np.mean(y)
x_ = x - mean_x
y_ = y - mean_y
a = np.dot(y_, x_) / np.dot(x_, x_)
b = mean_y - a * mean_x
return a, b
class Sampler:
def __init__(self, src):
self.src = iter(src)
def __call__(self, conf, size):
return take(self.src, size)
class InterpolatedSampler:
def __init__(self, conf, src, frequency=1.0, interpolate=False):
self.index = 0
self.frequency = frequency
self.interpolate = interpolate
self.offset = conf.interpolator_width + 1
self.buffer = np.zeros(conf.interpolator_coeff_len)
self.equalizer = lambda x: x
padding = [0.0] * conf.interpolator_width
self.src = itertools.chain(padding, src)
def __call__(self, conf, size):
frame = np.zeros(size)
count = 0
for frame_index in range(size):
k = int(self.offset)
j = int((self.offset - k) * conf.interpolator_resolution)
coeffs = conf.interpolator_filters[j]
end = k + conf.interpolator_width
try:
while self.index < end:
self.buffer[:-1] = self.buffer[1:]
self.buffer[-1] = next(self.src)
self.index += 1
except StopIteration:
break
self.offset += self.frequency
frame[frame_index] = np.dot(coeffs, self.buffer)
count = frame_index + 1
return self.equalizer(frame[:count])
class FIR:
def __init__(self, h):
self.h = np.array(h)
self.x_state = [0] * len(self.h)
def __call__(self, x):
x_ = self.x_state
h = self.h
for v in x:
x_ = [v] + x_[:-1]
yield np.dot(x_, h)
self.x_state = x_
def train_symbols(conf, length, constant_prefix=16):
r = prbs(reg=1, poly=0x1100B, bits=2)
constellation = [1, 1j, -1, -1j]
symbols = []
frequency_range = range(len(conf.frequencies))
for _ in range(length):
symbols.append([constellation[next(r)] for _ in frequency_range])
symbols = np.array(symbols)
symbols[:constant_prefix, :] = 1
return symbols
def modulate_training(conf, symbols):
gain = 1.0 / len(conf.carriers)
result = []
for s in symbols:
result.append(np.dot(s, conf.carriers))
result = np.multiply(np.concatenate(result).real, gain)
return result
def demodulate_training(conf, signal, size):
signal = itertools.chain(signal, itertools.repeat(0))
sampler = make_sampler(conf, signal)
symbols = demux(conf, sampler)
return np.array(list(itertools.islice(symbols, size)))
def write_symbol(conf, symbol, fd):
symbol = np.array(symbol) * conf.gain
symbol = symbol.real * conf.frequency_scaling
data = symbol.astype("int16").tobytes()
fd.write(data)
def write_equalization(conf, fd):
carriers = conf.carriers / len(conf.frequencies)
for value in conf.prefix_bytes:
write_symbol(conf, carriers[0] * value, fd)
silence = np.zeros(conf.silence_length * conf.num_symbols)
training = train_symbols(conf, conf.equalizer_length)
training = modulate_training(conf, training)
write_symbol(conf, silence, fd)
write_symbol(conf, training, fd)
write_symbol(conf, silence, fd)
def pack_block(conf, block):
frame = struct.pack(conf.prefix_fmt, len(block)) + block
return conf.ecc.encode(frame.ljust(conf.frame_size, b'\0'))
def encode_frame(conf, data):
for block in iterate(
data,
conf.payload_size,
bytearray,
truncate=False,
):
yield pack_block(conf, block)
yield pack_block(conf, conf.eof_block)
def decode_frame(conf, data):
data = iter(data)
while True:
chunk = bytearray(itertools.islice(data, conf.block_size))
block, _, _ = conf.ecc.decode(chunk)
prefix_length = struct.calcsize(conf.prefix_fmt)
length = struct.unpack(
conf.prefix_fmt,
block[:prefix_length]
)[0]
frame = block[prefix_length:prefix_length+length]
if len(frame) < length:
raise ValueError("Missing payload data")
if len(frame) == 0:
return
yield frame
def decode_frames(conf, bits):
for frame in decode_frame(conf, to_bytes(conf, bits)):
yield bytes(frame)
def modem_encode(conf, bits):
for bits_chunk in iterate(bits, conf.bits_per_symbol, tuple):
yield conf.encode_map[bits_chunk]
def modem_decode(conf, symbols, error_handler):
for received in symbols:
error = np.abs(conf.symbols - received)
index = np.argmin(error)
decoded, bits = conf.decode_list[index]
error_handler(received, decoded)
yield bits
def write_payload(conf, bits, fd):
carriers = conf.carriers / len(conf.frequencies)
sent = 0
symbols = itertools.chain(bits, [0] * conf.bits_per_baud * 2)
symbols = iterate(modem_encode(conf, symbols), len(carriers))
for i, symbol in enumerate(symbols):
write_symbol(conf, np.dot(np.array(tuple(symbol)), carriers), fd)
sent = i * len(conf.frequencies) * conf.bits_per_symbol
if i % conf.baud_rate == 0:
print(f"{conf.info}: TX {sent / 8e3:.3f} kB", file=sys.stderr, end="\r")
print(f"{conf.info}: TX {sent / 8e3:.3f} kB", file=sys.stderr)
def send(conf, src, dst):
silence_start = np.zeros(int(conf.sampling_frequency * conf.silence_start))
write_symbol(conf, silence_start, dst)
write_equalization(conf, dst)
payload = encode_payload(conf, itertools.chain.from_iterable(reader(src)))
write_payload(conf, payload, dst)
silence_stop = np.zeros(int(conf.sampling_frequency * conf.silence_stop))
write_symbol(conf, silence_stop, dst)
def demux(conf, sampler):
filters = np.array([exp_iwt(-w, conf.num_symbols) / (0.5 * conf.num_symbols) for w in conf.omegas])
while True:
frame = sampler(conf, conf.num_symbols)
if len(frame) != conf.num_symbols:
break
yield np.dot(filters, frame)
def detector_wait(conf, samples):
counter = 0
max_offset = conf.timeout * conf.sampling_frequency
buffers = collections.deque([], maxlen=conf.baud_rate)
for offset, buffer in iterate_index(samples, conf.num_symbols):
if offset > max_offset:
raise ValueError("Timeout waiting for carrier")
buffers.append(buffer)
coeff = coherence(buffer, conf.index_omega)
counter = counter + 1 if abs(coeff) > conf.coherence_threshold else 0
if counter == conf.carrier_threshold:
return offset, buffers
raise ValueError("No carrier detected")
def detector_run(conf, samples):
offset, bufs = detector_wait(conf, samples)
length = (conf.carrier_threshold - 1) * conf.num_symbols
begin = offset - length
start_time = begin * conf.symbol_duration / conf.num_symbols
bufs = list(bufs)[-conf.carrier_threshold - conf.search_window :]
n = conf.search_window + conf.carrier_duration - conf.carrier_threshold
trailing = list(itertools.islice(samples, n * conf.num_symbols))
bufs.append(np.array(trailing))
buf = np.concatenate(bufs)
offset = detector_find_start(conf, buf)
start_time += (offset / conf.num_symbols - conf.search_window) * conf.symbol_duration
buf = buf[offset:]
prefix_length = conf.carrier_duration * conf.num_symbols
amplitude, frequency_error = detector_estimate(conf, buf[:prefix_length])
return (itertools.chain(buf, samples), amplitude, frequency_error)
def detector_find_start(conf, buf):
carrier = exp_iwt(conf.index_omega, conf.num_symbols)
carrier = np.tile(carrier, conf.start_pattern_length)
zeroes = carrier * 0.0
signal = np.concatenate([zeroes, carrier])
signal = (2**0.5) * signal / norm(signal)
corr = np.abs(np.correlate(buf, signal))
norm_b = np.sqrt(np.correlate(np.abs(buf) ** 2, np.ones(len(signal))))
coeffs = np.zeros_like(corr)
coeffs[norm_b > 0.0] = corr[norm_b > 0.0] / norm_b[norm_b > 0.0]
index = np.argmax(coeffs)
offset = index + len(zeroes)
return offset
def detector_estimate(conf, buffer, skip=5):
filters = exp_iwt(-conf.index_omega, conf.num_symbols) / (0.5 * conf.num_symbols)
frames = iterate(buffer, conf.num_symbols)
symbols = [np.dot(filters, frame) for frame in frames]
symbols = np.array(symbols[skip:-skip])
amplitude = np.mean(np.abs(symbols))
phase = np.unwrap(np.angle(symbols)) / (2 * np.pi)
indices = np.arange(len(phase))
a, _ = linear_regression(indices, phase)
frequency_error = a / (conf.symbol_duration * conf.carrier_frequency)
return amplitude, frequency_error
def receiver_prefix(conf, symbols, gain=1.0):
S = take(symbols, len(conf.prefix_bytes))
S = S[:, 0] * gain
sliced = np.round(np.abs(S))
bits = np.array(sliced, dtype=int)
errors = bits != conf.prefix_bytes
if any(errors):
raise ValueError(f"Incorrect prefix: {sum(errors)} errors")
def receiver_train(conf, sampler, order, lookahead):
trained_symbols = train_symbols(conf, conf.equalizer_length)
trained_signal = modulate_training(conf, trained_symbols) * len(conf.frequencies)
prefix_bytes = conf.silence_length * conf.num_symbols
postfix_bytes = prefix_bytes
signal_length = conf.equalizer_length * conf.num_symbols + prefix_bytes + postfix_bytes
signal = sampler(conf, signal_length + lookahead)
padding = np.zeros(lookahead)
x = np.concatenate([signal[prefix_bytes:-postfix_bytes], padding])
y = np.concatenate([padding, np.concatenate([trained_signal, np.zeros(lookahead)])])
N = order + lookahead
Rxx = np.zeros(N)
Rxy = np.zeros(N)
for i in range(N):
Rxx[i] = np.dot(x[i:], x[: len(x) - i])
Rxy[i] = np.dot(y[i:], x[: len(x) - i])
coeffs = solver(t=Rxx, y=Rxy)
equalization_filter = FIR(coeffs)
equalized = list(equalization_filter(signal))
equalized = equalized[prefix_bytes + lookahead : -postfix_bytes + lookahead]
verify_training(conf, equalized, trained_symbols)
return equalization_filter
def verify_training(conf, equalized, train_symbols):
equalizer_length = conf.equalizer_length
symbols = demodulate_training(conf, equalized, conf.equalizer_length)
sliced = np.array(symbols).round()
errors = np.array(sliced - train_symbols, dtype=bool)
error_rate = errors.sum() / errors.size
errors = np.array(symbols - train_symbols)
noise_rms = rms(errors)
signal_rms = rms(train_symbols)
SNRs = 20.0 * np.log10(signal_rms / noise_rms)
print(f"\nError rate: {error_rate}", file=sys.stderr, end="\n")
for (i, freq), snr in zip(enumerate(reversed(conf.frequencies)), reversed(SNRs)):
print(f"SNR: {freq}: {snr}", file=sys.stderr, end="\n")
def receiver_bitstream(conf, symbols, error_handler):
streams = []
generators = split(symbols, n=len(conf.omegas))
for frequency, S in zip(conf.frequencies, generators):
equalized = []
S = icapture(S, result=equalized)
modem_error_handler = functools.partial(error_handler, frequency)
bits = modem_decode(conf, S, modem_error_handler)
streams.append(bits)
return zip(*streams)
def receiver_demodulate(conf, sampler, symbols):
received = 0
errors = {}
noise = {}
def error_handler(frequency, received, decoded):
errors.setdefault(frequency, []).append(received / decoded)
noise.setdefault(frequency, []).append(received - decoded)
bitstream = receiver_bitstream(conf, symbols, error_handler)
try:
for i, block_of_bits in enumerate(bitstream):
for bits in block_of_bits:
received += len(bits)
yield bits
update_sampler(conf, errors, sampler)
if i % conf.bits_per_baud == 0:
print(f"{conf.info}: RX {received / 8e3:.3f} kB", file=sys.stderr, end="\r")
finally:
print(f"{conf.info}: RX {received / 8e3:.3f} kB", file=sys.stderr)
def update_sampler(conf, errors, sampler):
frequency_error_gain = 0.01 * conf.symbol_duration
err = np.array([e for v in errors.values() for e in v])
err = np.mean(np.angle(err)) / (2 * np.pi) if err.size else 0
errors.clear()
sampler.frequency -= frequency_error_gain * err
sampler.offset -= err
def receiver_run(conf, sampler, gain, output):
symbols = demux(conf, sampler)
receiver_prefix(conf, symbols, gain)
filters = receiver_train(conf, sampler, order=10, lookahead=10)
sampler.equalizer = lambda x: list(filters(x))
bitstream = receiver_demodulate(conf, sampler, symbols)
bitstream = itertools.chain.from_iterable(bitstream)
for frame in decode_frames(conf, bitstream):
output.write(frame)
def make_sampler(conf, src, frequency=1.0, interpolate=False):
if interpolate:
return InterpolatedSampler(conf, src, frequency, interpolate)
else:
return Sampler(src)
def recv(conf, src, dst):
src_reader = reader(src, dtype=lambda x: np.frombuffer(x, dtype="int16") / conf.frequency_scaling)
signal = itertools.chain.from_iterable(src_reader)
take(signal, int(conf.sampling_frequency * conf.skip_start))
signal, amplitude, frequency_error = detector_run(conf, signal)
sampler = make_sampler(conf, signal, 1 / (1.0 + frequency_error), interpolate=True)
receiver_run(conf, sampler, gain=1.0 / amplitude, output=dst)
dst.flush()
script_name = os.path.basename(__file__)
if script_name.endswith("-tx"):
send(make_conf(), sys.stdin.buffer, sys.stdout.buffer)
elif script_name.endswith("-rx"):
recv(make_conf(), sys.stdin.buffer, sys.stdout.buffer)
else:
print("Ambiguous script name, use -tx or -rx suffixes", file=sys.stderr)
exit(1)
@aynik
Copy link
Author

aynik commented Jun 6, 2022

Link it for sending and receiving modes:

ln -s modem-qam modem-tx
ln -s modem-qam modem-rx

MD Device Simulator:

ln -s 
modem-tx < test.tx | sox -G -t raw -c 1 -b 16 -e signed -r 16000 - -t wav -r 44100 test.tx.wav
atracdenc --encode -i test.tx.wav -o test.tx.aea
atracdenc --decode -i test.tx.aea -o test.rx.wav
sox test.rx.wav -t raw -c 1 -b 16 -e signed -r 16000 - | modem-rx > test.rx
sha1sum test.tx test.rx
ffprobe test.tx.wav 2>&1 | rg bitrate

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment