Skip to content

Instantly share code, notes, and snippets.

@prydin
Created February 15, 2024 23:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save prydin/507f58d31617dbac869ac0df70da9252 to your computer and use it in GitHub Desktop.
Save prydin/507f58d31617dbac869ac0df70da9252 to your computer and use it in GitHub Desktop.
AM demodulation pipeline
# Baseband sample rate
import math
import matplotlib.pyplot as plt
import numpy
from scipy import signal, fft
import numpy as np
# RF sampling rate
rf_rate = 6e6
# Carrier frequency
carrier = 2e6
# IF sample rate
if_rate = 2e5
# Length of simulation run (must be power of 2)
n_samples = 65536
# Post filter downsample ratio
downsample_ratio = 256
class DCBlocker:
offset = 0
rate = 0.01
def process(self, x):
y = x - self.offset
self.offset += y * self.rate
return y
dc_blocker = DCBlocker()
class Filter:
p = 0
buffer = []
taps = []
def __init__(self, taps):
self.taps = taps
self.buffer = [0] * len(self.taps)
def process(self, x):
self.buffer[self.p] = x
self.p -= 1
if self.p < 0:
self.p = len(self.buffer) - 1
# Convolve with taps. Buffer grows towards lower index
# so convolution should iterate toward higher index
k = self.p
acc = 0
for i in range(len(self.buffer)):
acc += self.buffer[k] * self.taps[i]
k += 1
if k >= len(self.buffer):
k = 0
return acc
# 5kHz bandwidth filter
taps_5k = signal.firwin(100, 2000, fs=rf_rate, pass_zero="lowpass", window="hann")
filter_5k_i = Filter(taps_5k)
filter_5k_q = Filter(taps_5k)
class Filter:
taps = []
buffer = []
p = 0
def __init__(self, taps):
self.taps = taps[len(taps)/2:]
self.buffer = [0] * len(self.taps)
def process(self, x):
self.buffer[self.p] = x
self.p -= 1
if self.p < 0:
self.p = len(self.buffer) - 1
# Convolve with taps. Buffer grows towards lower index
# so convolution should iterate toward higher index
k = self.p
acc = 0
for i in range(len(p)):
acc += self.buffer[k] * self.taps[i]
k += 1
if k >= self.len(self.buffer):
k = 0
return acc
def osc(t, rate, f, phase):
return math.cos((t * f / rate) * 2 * math.pi + phase)
def gen_am(t, rate, f, siggen, weight):
return osc(t, rate, f, 0) * (siggen(t, rate) * weight + (1 - weight))
def tone_1khz(t, rate):
return osc(t, rate, 1000, 0)
def two_tones(t, rate):
return (osc(t, rate, 1000, 0) + osc(t, rate, 1500, math.pi / 4)) / 2
def tone_10khz(t, rate):
return osc(t, rate, 100000, 0)
def downshift(t, x, rate, f_baseband, f_i):
return x * osc(t, rate, f_baseband - f_i, 0)
def downsample(t, x, base_rate, new_rate, handler):
i = t * base_rate
if i % new_rate == 0:
handler(t, x, new_rate)
def to_quad(t, x, rate, f):
return x * osc(t, rate, f, 0), x * osc(t, rate, f, math.pi / 2)
def pipeline(t, rate):
# Simulate two AM stations transmitting with 50000kHz separation
x = gen_am(t, rate, carrier, two_tones, 0.5) + gen_am(t, carrier + 50000, rate, tone_1khz, 0.3)
# Downshift to baseband
i, q = to_quad(t, x, rate, carrier)
# Filter to desired bandwidth
i = filter_5k_i.process(i)
q = filter_5k_q.process(q)
# Combine I and Q (TODO: Is addition the right operation?)
x = i + q
# Remove DC
x = dc_blocker.process(x)
return x
def output_generator(rate, n_samples, downsample_ratio):
# EXTREMELY naive downsample algorithm. Just pick the sample
# that happens to be ad an even multiple of the downsample_ratio
t = 0
n = 0
for i in range(n_samples):
y = pipeline(t, rate)
t += 1
n += 1
if n >= downsample_ratio:
n = 0
yield y
def run():
audio_rate = rf_rate / downsample_ratio
v = list(output_generator(rf_rate, n_samples, downsample_ratio))
w = signal.windows.hann(len(v))
y = fft.fft(v*w, norm="forward")
ax = plt.subplot()
xaxis_gen = (i * audio_rate / len(v) for i in range(int(len(v) / 2)))
# ax.plot(list(xaxis_gen), 10*np.log(np.absolute(y[0:int(len(v)/2)])))
ax.plot(range(len(v)), v)
plt.show()
# Main entry point
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment