Last active
February 12, 2024 13:45
-
-
Save aynik/d59eaef65568fb72ea9ff677b1c348e3 to your computer and use it in GitHub Desktop.
QAM/OFDM Audio Modem
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import sys | |
import types | |
import struct | |
import itertools | |
import functools | |
import collections | |
try: | |
import numpy as np | |
import reedsolo | |
except ModuleNotFoundError as error: | |
print(f"Error: Required module '{error.name}' not found", file=sys.stderr) | |
exit(1) | |
def make_conf(**kwargs): | |
conf = types.SimpleNamespace() | |
conf.sampling_frequency = 16e3 | |
conf.num_points = 256 | |
conf.frequency_spacing = 1e3 | |
conf.max_frequency = 6e3 | |
conf.gain = 1.0 | |
conf.bits_per_sample = 16 | |
conf.block_size = 255 | |
conf.ecc_symbols = 7 | |
conf.coherence_threshold = 0.9 | |
conf.equalizer_length = 100 | |
conf.silence_length = 10 | |
conf.silence_start = 0.1 | |
conf.silence_stop = 0.1 | |
conf.skip_start = 0.1 | |
conf.timeout = 60.0 | |
conf.eof_block = b"" | |
conf.prefix_fmt = ">B" | |
conf.__dict__.update(kwargs) | |
configure_prefix(conf) | |
configure_bitrate(conf) | |
configure_bit_packer(conf) | |
configure_carriers(conf) | |
configure_omegas(conf) | |
configure_symbols(conf) | |
configure_modem(conf) | |
configure_interpolator(conf) | |
configure_ecc(conf) | |
configure_info(conf) | |
return conf | |
def configure_prefix(conf): | |
assert conf.equalizer_length is not None, "equalizer_length must be set" | |
assert conf.silence_length is not None, "silence_length must be set" | |
conf.prefix_bytes = [1] * conf.equalizer_length + [0] * conf.silence_length | |
conf.carrier_duration = sum(conf.prefix_bytes) | |
conf.carrier_threshold = int(conf.coherence_threshold * conf.carrier_duration) | |
conf.search_window = int(0.1 * conf.carrier_duration) | |
conf.start_pattern_length = conf.search_window // 4 | |
def configure_bitrate(conf): | |
assert conf.sampling_frequency is not None, "sampling_frequency must be set" | |
assert conf.num_points is not None, "num_points must be set" | |
assert conf.max_frequency is not None, "max_frequency must be set" | |
assert conf.frequency_spacing is not None, "frequency_spacing must be set" | |
conf.frequencies = np.arange( | |
conf.frequency_spacing, | |
conf.max_frequency + conf.frequency_spacing, | |
conf.frequency_spacing | |
) | |
conf.frequency_scaling = conf.sampling_frequency | |
conf.bits_per_symbol = int(np.log2(conf.num_points)) | |
conf.bits_per_baud = conf.bits_per_symbol * len(conf.frequencies) | |
conf.signaling_interval = 1.0 / conf.sampling_frequency | |
conf.symbol_duration = 1 / conf.frequency_spacing | |
conf.symbol_rate = 1 / conf.symbol_duration | |
conf.num_symbols = int(conf.symbol_duration / conf.signaling_interval) | |
conf.baud_rate = int(conf.symbol_rate) | |
conf.modem_bps = conf.baud_rate * conf.bits_per_baud | |
def configure_bit_packer(conf): | |
bits = [tuple((1 if b else 0) for b in [index & (2**k) for k in range(8)]) for index in range(2**8)] | |
conf.to_bits = dict((i, bit) for i, bit in enumerate(bits)) | |
conf.to_byte = dict((bit, i) for i, bit in enumerate(bits)) | |
def configure_carriers(conf): | |
assert conf.num_symbols is not None, "num_symbols must be set" | |
assert conf.signaling_interval is not None, "signaling_interval must be set" | |
assert conf.frequencies is not None, "frequencies must be set" | |
conf.carrier_frequency = conf.frequencies[0] | |
conf.carriers = np.array([np.exp(2j * np.pi * f * np.arange(0, conf.num_symbols) * conf.signaling_interval) for f in conf.frequencies]) | |
def configure_omegas(conf): | |
assert conf.frequencies is not None, "frequencies must be set" | |
assert conf.sampling_frequency is not None, "sampling_frequency must be set" | |
assert conf.carrier_frequency is not None, "carrier_frequency must be set" | |
conf.omegas = 2 * np.pi * np.array(conf.frequencies) / conf.sampling_frequency | |
conf.index_omega = 2 * np.pi * conf.carrier_frequency / conf.sampling_frequency | |
def configure_symbols(conf): | |
assert conf.bits_per_symbol is not None, "bits_per_symbol must be set" | |
assert conf.num_points is not None, "num_points must be set" | |
symbols = int(np.ceil(conf.bits_per_symbol // 2)) | |
symbols = np.array([complex(x, y) for x in range(2**symbols) for y in range(conf.num_points // 2**symbols)]) | |
symbols = symbols - symbols[-1] / 2 | |
conf.symbols = symbols / np.max(np.abs(symbols)) | |
def configure_modem(conf): | |
assert conf.bits_per_symbol is not None, "bits_per_symbol must be set" | |
assert conf.symbols is not None, "symbols must be set" | |
conf.encode_map = {tuple([int(i & (1 << j) != 0) for j in range(conf.bits_per_symbol)]): v for i, v in enumerate(conf.symbols)} | |
bits_map = { v: k for k, v in conf.encode_map.items()} | |
conf.decode_list = [(s, bits_map[s]) for s in conf.symbols] | |
def configure_interpolator(conf): | |
conf.interpolator_width = 128 | |
conf.interpolator_resolution = 1024 | |
N = conf.interpolator_resolution * conf.interpolator_width | |
u = np.arange(-N, N, dtype=float) | |
window = np.cos(0.5 * np.pi * u / N) ** 2.0 | |
h = np.sinc(u / conf.interpolator_resolution) * window | |
conf.interpolator_filters = [] | |
for index in range(conf.interpolator_resolution): | |
interpolator_filter = h[index :: conf.interpolator_resolution] | |
interpolator_filter = interpolator_filter[::-1] | |
conf.interpolator_filters.append(interpolator_filter) | |
conf.interpolator_coeff_len = 2 * conf.interpolator_width | |
def configure_ecc(conf): | |
assert conf.block_size is not None, "block_size must be set" | |
assert conf.ecc_symbols is not None, "ecc_symbols must be set" | |
assert conf.prefix_fmt is not None, "prefix_fmt must be set" | |
conf.frame_size = conf.block_size - conf.ecc_symbols | |
conf.payload_size = conf.frame_size - struct.calcsize(conf.prefix_fmt) | |
conf.ecc = reedsolo.RSCodec(conf.ecc_symbols, conf.block_size) | |
def configure_info(conf): | |
conf.info = "{0:.1f} kb/s ({1:d}-QAM x {2:d} carriers) Fs={3:.1f} kHz".format( | |
conf.modem_bps / 1e3, | |
len(conf.symbols), | |
len(conf.frequencies), | |
conf.sampling_frequency / 1e3, | |
) | |
def chain_wrapper(func): | |
@functools.wraps(func) | |
def wrapped(*args, **kwargs): | |
result = func(*args, **kwargs) | |
return itertools.chain.from_iterable(result) | |
return wrapped | |
@chain_wrapper | |
def to_bytes(conf, bits): | |
for chunk in iterate(bits, 8, dtype=tuple, truncate=True): | |
yield [conf.to_byte[chunk]] | |
@chain_wrapper | |
def encode_payload(conf, data): | |
for frame in encode_frame(conf, data): | |
for byte in frame: | |
yield conf.to_bits[byte] | |
def take(iterable, n): | |
return np.array(list(itertools.islice(iterable, n))) | |
def icapture(iterable, result): | |
for i in iter(iterable): | |
result.append(i) | |
yield i | |
def split(iterable, n): | |
def _gen(it, index): | |
for item in it: | |
yield item[index] | |
iterables = itertools.tee(iterable, n) | |
return [_gen(it, index) for index, it in enumerate(iterables)] | |
def iterate_index(data, size, dtype=None, truncate=True): | |
offset = 0 | |
data = iter(data) | |
while True: | |
buffer = list(itertools.islice(data, size)) | |
if len(buffer) < size: | |
if truncate or not buffer: | |
return | |
result = dtype(buffer) if dtype else np.array(buffer) | |
yield (offset, result) | |
offset += size | |
def iterate(data, size, dtype=None, truncate=True): | |
yield from (result for _, result in iterate_index(data, size, dtype, truncate)) | |
def prbs(reg, poly, bits): | |
while True: | |
mask = (1 << bits) - 1 | |
size = 0 | |
while (poly >> size) > 1: | |
size += 1 | |
while True: | |
yield reg & mask | |
reg = reg << 1 | |
if reg >> size: | |
reg = reg ^ poly | |
def reader(fd, size=(8 << 10), dtype=None): | |
while True: | |
data = fd.read(size) | |
if not data: | |
break | |
yield dtype(data) if dtype else data | |
def solver(t, y): | |
N = len(t) | |
t0 = np.array([1.0 / t[0]]) | |
f = [t0] | |
b = [t0] | |
for n in range(1, N): | |
prev_f = f[-1] | |
prev_b = b[-1] | |
ef = sum(t[n - i] * prev_f[i] for i in range(n)) | |
eb = sum(t[i + 1] * prev_b[i] for i in range(n)) | |
f_ = np.concatenate([prev_f, np.array([0])]) | |
b_ = np.concatenate([np.array([0]), prev_b]) | |
det = 1.0 - ef * eb | |
f.append((f_ - ef * b_) / det) | |
b.append((b_ - eb * f_) / det) | |
x = [] | |
for n in range(N): | |
x = np.concatenate([x, np.array([0])]) | |
ef = sum(t[n - i] * x[i] for i in range(n)) | |
x = x + (y[n] - ef) * b[n] | |
return x | |
def exp_iwt(omega, n): | |
return np.exp(1j * omega * np.arange(n)) | |
def norm(x): | |
return np.sqrt(np.dot(x.conj(), x).real) | |
def rms(x): | |
return np.mean(np.abs(x) ** 2, axis=0) ** 0.5 | |
def coherence(x, omega): | |
n = len(x) | |
Hc = exp_iwt(-omega, n) / np.sqrt(0.5 * n) | |
norm_x = norm(x) | |
if not norm_x: | |
return 0.0 | |
return np.dot(Hc, x) / norm_x | |
def linear_regression(x, y): | |
x = np.array(x) | |
y = np.array(y) | |
mean_x = np.mean(x) | |
mean_y = np.mean(y) | |
x_ = x - mean_x | |
y_ = y - mean_y | |
a = np.dot(y_, x_) / np.dot(x_, x_) | |
b = mean_y - a * mean_x | |
return a, b | |
class Sampler: | |
def __init__(self, src): | |
self.src = iter(src) | |
def __call__(self, conf, size): | |
return take(self.src, size) | |
class InterpolatedSampler: | |
def __init__(self, conf, src, frequency=1.0, interpolate=False): | |
self.index = 0 | |
self.frequency = frequency | |
self.interpolate = interpolate | |
self.offset = conf.interpolator_width + 1 | |
self.buffer = np.zeros(conf.interpolator_coeff_len) | |
self.equalizer = lambda x: x | |
padding = [0.0] * conf.interpolator_width | |
self.src = itertools.chain(padding, src) | |
def __call__(self, conf, size): | |
frame = np.zeros(size) | |
count = 0 | |
for frame_index in range(size): | |
k = int(self.offset) | |
j = int((self.offset - k) * conf.interpolator_resolution) | |
coeffs = conf.interpolator_filters[j] | |
end = k + conf.interpolator_width | |
try: | |
while self.index < end: | |
self.buffer[:-1] = self.buffer[1:] | |
self.buffer[-1] = next(self.src) | |
self.index += 1 | |
except StopIteration: | |
break | |
self.offset += self.frequency | |
frame[frame_index] = np.dot(coeffs, self.buffer) | |
count = frame_index + 1 | |
return self.equalizer(frame[:count]) | |
class FIR: | |
def __init__(self, h): | |
self.h = np.array(h) | |
self.x_state = [0] * len(self.h) | |
def __call__(self, x): | |
x_ = self.x_state | |
h = self.h | |
for v in x: | |
x_ = [v] + x_[:-1] | |
yield np.dot(x_, h) | |
self.x_state = x_ | |
def train_symbols(conf, length, constant_prefix=16): | |
r = prbs(reg=1, poly=0x1100B, bits=2) | |
constellation = [1, 1j, -1, -1j] | |
symbols = [] | |
frequency_range = range(len(conf.frequencies)) | |
for _ in range(length): | |
symbols.append([constellation[next(r)] for _ in frequency_range]) | |
symbols = np.array(symbols) | |
symbols[:constant_prefix, :] = 1 | |
return symbols | |
def modulate_training(conf, symbols): | |
gain = 1.0 / len(conf.carriers) | |
result = [] | |
for s in symbols: | |
result.append(np.dot(s, conf.carriers)) | |
result = np.multiply(np.concatenate(result).real, gain) | |
return result | |
def demodulate_training(conf, signal, size): | |
signal = itertools.chain(signal, itertools.repeat(0)) | |
sampler = make_sampler(conf, signal) | |
symbols = demux(conf, sampler) | |
return np.array(list(itertools.islice(symbols, size))) | |
def write_symbol(conf, symbol, fd): | |
symbol = np.array(symbol) * conf.gain | |
symbol = symbol.real * conf.frequency_scaling | |
data = symbol.astype("int16").tobytes() | |
fd.write(data) | |
def write_equalization(conf, fd): | |
carriers = conf.carriers / len(conf.frequencies) | |
for value in conf.prefix_bytes: | |
write_symbol(conf, carriers[0] * value, fd) | |
silence = np.zeros(conf.silence_length * conf.num_symbols) | |
training = train_symbols(conf, conf.equalizer_length) | |
training = modulate_training(conf, training) | |
write_symbol(conf, silence, fd) | |
write_symbol(conf, training, fd) | |
write_symbol(conf, silence, fd) | |
def pack_block(conf, block): | |
frame = struct.pack(conf.prefix_fmt, len(block)) + block | |
return conf.ecc.encode(frame.ljust(conf.frame_size, b'\0')) | |
def encode_frame(conf, data): | |
for block in iterate( | |
data, | |
conf.payload_size, | |
bytearray, | |
truncate=False, | |
): | |
yield pack_block(conf, block) | |
yield pack_block(conf, conf.eof_block) | |
def decode_frame(conf, data): | |
data = iter(data) | |
while True: | |
chunk = bytearray(itertools.islice(data, conf.block_size)) | |
block, _, _ = conf.ecc.decode(chunk) | |
prefix_length = struct.calcsize(conf.prefix_fmt) | |
length = struct.unpack( | |
conf.prefix_fmt, | |
block[:prefix_length] | |
)[0] | |
frame = block[prefix_length:prefix_length+length] | |
if len(frame) < length: | |
raise ValueError("Missing payload data") | |
if len(frame) == 0: | |
return | |
yield frame | |
def decode_frames(conf, bits): | |
for frame in decode_frame(conf, to_bytes(conf, bits)): | |
yield bytes(frame) | |
def modem_encode(conf, bits): | |
for bits_chunk in iterate(bits, conf.bits_per_symbol, tuple): | |
yield conf.encode_map[bits_chunk] | |
def modem_decode(conf, symbols, error_handler): | |
for received in symbols: | |
error = np.abs(conf.symbols - received) | |
index = np.argmin(error) | |
decoded, bits = conf.decode_list[index] | |
error_handler(received, decoded) | |
yield bits | |
def write_payload(conf, bits, fd): | |
carriers = conf.carriers / len(conf.frequencies) | |
sent = 0 | |
symbols = itertools.chain(bits, [0] * conf.bits_per_baud * 2) | |
symbols = iterate(modem_encode(conf, symbols), len(carriers)) | |
for i, symbol in enumerate(symbols): | |
write_symbol(conf, np.dot(np.array(tuple(symbol)), carriers), fd) | |
sent = i * len(conf.frequencies) * conf.bits_per_symbol | |
if i % conf.baud_rate == 0: | |
print(f"{conf.info}: TX {sent / 8e3:.3f} kB", file=sys.stderr, end="\r") | |
print(f"{conf.info}: TX {sent / 8e3:.3f} kB", file=sys.stderr) | |
def send(conf, src, dst): | |
silence_start = np.zeros(int(conf.sampling_frequency * conf.silence_start)) | |
write_symbol(conf, silence_start, dst) | |
write_equalization(conf, dst) | |
payload = encode_payload(conf, itertools.chain.from_iterable(reader(src))) | |
write_payload(conf, payload, dst) | |
silence_stop = np.zeros(int(conf.sampling_frequency * conf.silence_stop)) | |
write_symbol(conf, silence_stop, dst) | |
def demux(conf, sampler): | |
filters = np.array([exp_iwt(-w, conf.num_symbols) / (0.5 * conf.num_symbols) for w in conf.omegas]) | |
while True: | |
frame = sampler(conf, conf.num_symbols) | |
if len(frame) != conf.num_symbols: | |
break | |
yield np.dot(filters, frame) | |
def detector_wait(conf, samples): | |
counter = 0 | |
max_offset = conf.timeout * conf.sampling_frequency | |
buffers = collections.deque([], maxlen=conf.baud_rate) | |
for offset, buffer in iterate_index(samples, conf.num_symbols): | |
if offset > max_offset: | |
raise ValueError("Timeout waiting for carrier") | |
buffers.append(buffer) | |
coeff = coherence(buffer, conf.index_omega) | |
counter = counter + 1 if abs(coeff) > conf.coherence_threshold else 0 | |
if counter == conf.carrier_threshold: | |
return offset, buffers | |
raise ValueError("No carrier detected") | |
def detector_run(conf, samples): | |
offset, bufs = detector_wait(conf, samples) | |
length = (conf.carrier_threshold - 1) * conf.num_symbols | |
begin = offset - length | |
start_time = begin * conf.symbol_duration / conf.num_symbols | |
bufs = list(bufs)[-conf.carrier_threshold - conf.search_window :] | |
n = conf.search_window + conf.carrier_duration - conf.carrier_threshold | |
trailing = list(itertools.islice(samples, n * conf.num_symbols)) | |
bufs.append(np.array(trailing)) | |
buf = np.concatenate(bufs) | |
offset = detector_find_start(conf, buf) | |
start_time += (offset / conf.num_symbols - conf.search_window) * conf.symbol_duration | |
buf = buf[offset:] | |
prefix_length = conf.carrier_duration * conf.num_symbols | |
amplitude, frequency_error = detector_estimate(conf, buf[:prefix_length]) | |
return (itertools.chain(buf, samples), amplitude, frequency_error) | |
def detector_find_start(conf, buf): | |
carrier = exp_iwt(conf.index_omega, conf.num_symbols) | |
carrier = np.tile(carrier, conf.start_pattern_length) | |
zeroes = carrier * 0.0 | |
signal = np.concatenate([zeroes, carrier]) | |
signal = (2**0.5) * signal / norm(signal) | |
corr = np.abs(np.correlate(buf, signal)) | |
norm_b = np.sqrt(np.correlate(np.abs(buf) ** 2, np.ones(len(signal)))) | |
coeffs = np.zeros_like(corr) | |
coeffs[norm_b > 0.0] = corr[norm_b > 0.0] / norm_b[norm_b > 0.0] | |
index = np.argmax(coeffs) | |
offset = index + len(zeroes) | |
return offset | |
def detector_estimate(conf, buffer, skip=5): | |
filters = exp_iwt(-conf.index_omega, conf.num_symbols) / (0.5 * conf.num_symbols) | |
frames = iterate(buffer, conf.num_symbols) | |
symbols = [np.dot(filters, frame) for frame in frames] | |
symbols = np.array(symbols[skip:-skip]) | |
amplitude = np.mean(np.abs(symbols)) | |
phase = np.unwrap(np.angle(symbols)) / (2 * np.pi) | |
indices = np.arange(len(phase)) | |
a, _ = linear_regression(indices, phase) | |
frequency_error = a / (conf.symbol_duration * conf.carrier_frequency) | |
return amplitude, frequency_error | |
def receiver_prefix(conf, symbols, gain=1.0): | |
S = take(symbols, len(conf.prefix_bytes)) | |
S = S[:, 0] * gain | |
sliced = np.round(np.abs(S)) | |
bits = np.array(sliced, dtype=int) | |
errors = bits != conf.prefix_bytes | |
if any(errors): | |
raise ValueError(f"Incorrect prefix: {sum(errors)} errors") | |
def receiver_train(conf, sampler, order, lookahead): | |
trained_symbols = train_symbols(conf, conf.equalizer_length) | |
trained_signal = modulate_training(conf, trained_symbols) * len(conf.frequencies) | |
prefix_bytes = conf.silence_length * conf.num_symbols | |
postfix_bytes = prefix_bytes | |
signal_length = conf.equalizer_length * conf.num_symbols + prefix_bytes + postfix_bytes | |
signal = sampler(conf, signal_length + lookahead) | |
padding = np.zeros(lookahead) | |
x = np.concatenate([signal[prefix_bytes:-postfix_bytes], padding]) | |
y = np.concatenate([padding, np.concatenate([trained_signal, np.zeros(lookahead)])]) | |
N = order + lookahead | |
Rxx = np.zeros(N) | |
Rxy = np.zeros(N) | |
for i in range(N): | |
Rxx[i] = np.dot(x[i:], x[: len(x) - i]) | |
Rxy[i] = np.dot(y[i:], x[: len(x) - i]) | |
coeffs = solver(t=Rxx, y=Rxy) | |
equalization_filter = FIR(coeffs) | |
equalized = list(equalization_filter(signal)) | |
equalized = equalized[prefix_bytes + lookahead : -postfix_bytes + lookahead] | |
verify_training(conf, equalized, trained_symbols) | |
return equalization_filter | |
def verify_training(conf, equalized, train_symbols): | |
equalizer_length = conf.equalizer_length | |
symbols = demodulate_training(conf, equalized, conf.equalizer_length) | |
sliced = np.array(symbols).round() | |
errors = np.array(sliced - train_symbols, dtype=bool) | |
error_rate = errors.sum() / errors.size | |
errors = np.array(symbols - train_symbols) | |
noise_rms = rms(errors) | |
signal_rms = rms(train_symbols) | |
SNRs = 20.0 * np.log10(signal_rms / noise_rms) | |
print(f"\nError rate: {error_rate}", file=sys.stderr, end="\n") | |
for (i, freq), snr in zip(enumerate(reversed(conf.frequencies)), reversed(SNRs)): | |
print(f"SNR: {freq}: {snr}", file=sys.stderr, end="\n") | |
def receiver_bitstream(conf, symbols, error_handler): | |
streams = [] | |
generators = split(symbols, n=len(conf.omegas)) | |
for frequency, S in zip(conf.frequencies, generators): | |
equalized = [] | |
S = icapture(S, result=equalized) | |
modem_error_handler = functools.partial(error_handler, frequency) | |
bits = modem_decode(conf, S, modem_error_handler) | |
streams.append(bits) | |
return zip(*streams) | |
def receiver_demodulate(conf, sampler, symbols): | |
received = 0 | |
errors = {} | |
noise = {} | |
def error_handler(frequency, received, decoded): | |
errors.setdefault(frequency, []).append(received / decoded) | |
noise.setdefault(frequency, []).append(received - decoded) | |
bitstream = receiver_bitstream(conf, symbols, error_handler) | |
try: | |
for i, block_of_bits in enumerate(bitstream): | |
for bits in block_of_bits: | |
received += len(bits) | |
yield bits | |
update_sampler(conf, errors, sampler) | |
if i % conf.bits_per_baud == 0: | |
print(f"{conf.info}: RX {received / 8e3:.3f} kB", file=sys.stderr, end="\r") | |
finally: | |
print(f"{conf.info}: RX {received / 8e3:.3f} kB", file=sys.stderr) | |
def update_sampler(conf, errors, sampler): | |
frequency_error_gain = 0.01 * conf.symbol_duration | |
err = np.array([e for v in errors.values() for e in v]) | |
err = np.mean(np.angle(err)) / (2 * np.pi) if err.size else 0 | |
errors.clear() | |
sampler.frequency -= frequency_error_gain * err | |
sampler.offset -= err | |
def receiver_run(conf, sampler, gain, output): | |
symbols = demux(conf, sampler) | |
receiver_prefix(conf, symbols, gain) | |
filters = receiver_train(conf, sampler, order=10, lookahead=10) | |
sampler.equalizer = lambda x: list(filters(x)) | |
bitstream = receiver_demodulate(conf, sampler, symbols) | |
bitstream = itertools.chain.from_iterable(bitstream) | |
for frame in decode_frames(conf, bitstream): | |
output.write(frame) | |
def make_sampler(conf, src, frequency=1.0, interpolate=False): | |
if interpolate: | |
return InterpolatedSampler(conf, src, frequency, interpolate) | |
else: | |
return Sampler(src) | |
def recv(conf, src, dst): | |
src_reader = reader(src, dtype=lambda x: np.frombuffer(x, dtype="int16") / conf.frequency_scaling) | |
signal = itertools.chain.from_iterable(src_reader) | |
take(signal, int(conf.sampling_frequency * conf.skip_start)) | |
signal, amplitude, frequency_error = detector_run(conf, signal) | |
sampler = make_sampler(conf, signal, 1 / (1.0 + frequency_error), interpolate=True) | |
receiver_run(conf, sampler, gain=1.0 / amplitude, output=dst) | |
dst.flush() | |
script_name = os.path.basename(__file__) | |
if script_name.endswith("-tx"): | |
send(make_conf(), sys.stdin.buffer, sys.stdout.buffer) | |
elif script_name.endswith("-rx"): | |
recv(make_conf(), sys.stdin.buffer, sys.stdout.buffer) | |
else: | |
print("Ambiguous script name, use -tx or -rx suffixes", file=sys.stderr) | |
exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Link it for sending and receiving modes:
MD Device Simulator: