Skip to content

Instantly share code, notes, and snippets.

@theWatchmen
Last active April 1, 2024 14:29
Show Gist options
  • Save theWatchmen/ba706afc2887d2c053951537e443c3c8 to your computer and use it in GitHub Desktop.
Save theWatchmen/ba706afc2887d2c053951537e443c3c8 to your computer and use it in GitHub Desktop.
Resampling impementation in Python
import numpy as np
from scipy import signal
import matplotlib.pyplot as plt
import time
import math
Fs = 44100
Fs_prime = 96000
A = 90
TEST_SAMPLES = 1000
Nz = 13
Nzd = Nz * 2 + 1
# Cutoff frequency = pi / L
Nn = 7
Neta = 6
Np = Nn + Neta
L = 1 << Nn
M = (Nzd) * L
K = 32
def beta():
if A > 50:
return 0.1102 * (A - 8.7)
elif A > 21:
return 0.5842 * (A - 21)**(2./5.) + 0.07886 * (A - 21)
else:
return 0.0
# From https://dsp.stackexchange.com/questions/37714/kaiser-window-approximation
def bessel_coeff():
result = np.zeros(K)
fact = 1
for k in range(K):
fact *= (k + 1)
exp = 2 ** k
result[k] = 1 / (fact * exp)**2
return result
def bessel(x, bc):
k = K - 1
x2 = x**2
y = x2 * bc[k]
for k in range(K - 2, 0, -1):
y = x2 * (bc[k] + y)
return y
def window(bc):
m = M // 2 + 1
b = beta()
b_beta = bessel(b, bc)
m2 = M//2
y = np.zeros(M)
for n in range(m2 + 1):
bn = b * ( 1 - ( n / m2 )**2 )**0.5
bb = bessel(bn, bc)
y[n + m2 - 1] = (1 / b_beta) * bb
y[m2 - n] = y[n + m2 - 1]
return y
def resample_window_sinc(o, r, w):
# Increase time by 1/Fs'
t_increment = 1 / Fs_prime
inteval_orig = 1 / Fs
# interval_filter = 1 / (Fs_prime / 2)
interval_filter = 0.5 / L
Lo = len(o)
rho = Fs_prime / Fs
input_increment = 1 / rho
dtb = int(input_increment * (1 << Np) + 0.5)
y = np.zeros(r.shape[0])
t = 0
# tb = (1 << Neta) - 1
tb = 0
it = 0
eta = 1
# l0 = 0
for n in range(y.shape[0]):
# Assume that the filter is centered at t. We assume x[n] and x[n + 1] of the original signal are at either side
n0 = int(t / inteval_orig)
tn0 = n0 * inteval_orig
tn1 = (n0 + 1) * inteval_orig
# l0 = int(t / interval_filter)
# tl0 = l0 * interval_filter
# tl1 = (l0 + 1) * interval_filter
# eta0 = abs( tn0 - tl0 ) / inteval_orig
lt = it * interval_filter
l0 = int(lt) + 97
# eta0 = abs(tl0 - tn0) / interval_filter
eta0 = 1 - ((t - tn0) / (inteval_orig))
# eta0 = 1 - ((t - tn0) / (interval_filter))
# eta0 = 1 - it if it == 0.0 or it == 1.0 else (1 - it) % 1.0
eta1 = 1 - eta0
v = 0
nn = (l0) % L
# nn = (n + 64) % L
# nn = (n0 + 64) % L
if True:
for z in range(-Nz, Nz + 1):
ni = n0 + z
if ni < 0 or ni >= Lo:
continue
# eta = eta0 if z <=0 else eta1
# h_hat = eta * (w[nn][z + Nz] - w[nn][z])
# v += o[ni] * ( w[nn][z + Nz] + h_hat )
v += o[ni] * eta0 * w[nn][z + Nz]
nn = (l0 + 1) % L
# nn = (n + 1 + 64) % L
# nn = (n0 + 1 + 64) % L
eta = 1 - eta
for z in range(-Nz, Nz + 1):
ni = n0 + 1 + z
if ni < 0 or ni >= Lo:
continue
# eta = eta0 if z <=0 else eta1
# h_hat = eta * (w[nn][z + Nz] - w[nn][z])
# v += o[ni] * ( w[nn][z + Nz] + h_hat )
v += o[ni] * eta1 * w[nn][z + Nz]
if False:
tb = tb & ((1 << Np) - 1)
l0 = tb >> Neta
eta = (tb & ((1 << Neta) - 1)) / ((1 << Neta) - 1)
for z in range(-Nz, Nz + 1):
ni = n0 + z
if ni < 0 or ni >= Lo:
continue
etai = eta if z <=0 else (1 - eta)
nni = nn if z <= 0 else (nn + 1) % L
w_hat = (w[nni][z + Nz + 1] - w[nni][z + Nz]) if z + Nz + 1 < (Nz * 2 + 1) else 0
h_hat = etai * w_hat
v += o[ni] * ( w[nni][z + Nz] + h_hat )
y[n] = v
eta = (eta + (1 / rho)) % 1.0
# l0 += 2
t += t_increment
tb += dtb
it += input_increment
return y
def resample_box(o, r):
t_increment = 1 / Fs_prime
inteval_orig = 1 / Fs
y = np.zeros(r.shape[0])
t = 0
for n in range(y.shape[0]):
n0 = int(t / inteval_orig)
ot = n0 * inteval_orig
eta = (t - ot) / inteval_orig
if eta <= 0.5:
y[n] = o[n0]
else:
new_n = min(len(o) - 1, n0 + 1)
y[n] = o[new_n]
t += t_increment
return y
if __name__ == '__main__':
bc = bessel_coeff()
x = np.array(np.arange(-16, 16, 1/M))
y = np.zeros(x.shape)
for i in range(y.shape[0]):
y[i] = bessel(x[i], bc)
w = window(bc)
# w_test = signal.get_window(('kaiser', beta()), Nzd)
xw = np.array(range(-M//2, M//2))
scale = min(1, Fs_prime/Fs)
omega = min(Fs,Fs_prime)
# x_sinc = np.arange(0, np.pi * Nz, np.pi / L)
# sinc = [1 if x == 0 else np.sin(x) / (x) for x in x_sinc]
fc = 0.5 / L
# sinc = [np.sin(2*np.pi * fc * i) / i if i != 0 else np.sin(2*np.pi * fc) for i in range(-M//2, M//2)]
sinc = [np.sin(2*np.pi * fc * i) / (2*np.pi * fc * i) if i != 0 else 1 for i in range(-M//2, M//2)]
sinc = [w[i] * sinc[i] for i in range(M)]
# sinc_sum = np.sum(sinc)
# sinc = sinc / sinc_sum
# sinc *= L
sinc = [0]*34 + sinc # TODO(marco): compute correct number of zeros
sinc = np.array(sinc)
fc_test = (Fs_prime / 2) / L
sinc_test = signal.firwin(M, fc_test, window=('kaiser', beta()), fs=Fs_prime)
sinc_test *= L
sinc_testr = sinc_test.reshape(Nzd, L).transpose()
sinc_test = np.concatenate([np.zeros(34), sinc_test])
g_ = math.gcd(Fs_prime, Fs)
up = Fs_prime // g_
down = Fs // g_
max_rate = max(up, down)
f_c = 1. / max_rate # cutoff of FIR filter (rel. to Nyquist)
half_len = 10 * max_rate # reasonable cutoff for sinc-like function
r_test3 = signal.firwin(2 * half_len + 1, f_c, window=('kaiser', beta()))
r_test3 *= up
r_test3 = np.concatenate([np.zeros(34), r_test3])
# sincr = sinc.reshape(Nzd, L).transpose()
sincr = np.zeros((L, Nzd))
for l in range(L):
for i in range(Nzd):
sincr[l][i] = sinc[L*i + l]
xsincr = np.array(range(-Nz, Nz + 1))
wr = w.reshape(Nzd, L).transpose()
xwr = np.array(range(-Nz, Nz + 1))
o_count = TEST_SAMPLES
xo = np.array(np.arange(0, 1, 1/o_count))
original = np.sin(400 * np.pi * 2 * xo)
r_count = TEST_SAMPLES * Fs_prime/Fs
xr = np.array(np.arange(0, 1, 1/r_count))
r = resample_window_sinc(original, xr, sincr)
r_test = resample_window_sinc(original, xr, sinc_testr)
# r = resample_box(original, xr)
# r_test2 = signal.resample_poly(original, 960, 441, window=('kaiser', beta()))
r_test2 = signal.upfirdn(r_test3, original, up, down)
fft_o = np.fft.fft(original)
fft_r = np.fft.fft(r)
fft_r_test2 = np.fft.fft(r_test2)
fft_sinc = np.fft.fft(sinc)
# fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, layout='constrained')
# ax1.plot(xo, original)
# ax2.plot(np.abs(fft_o[:fft_o.shape[0]//2]))
# ax3.plot(xr, r)
# ax4.plot(np.abs(fft_r[:fft_r.shape[0]//2]))
# for i in range(xwr.shape[0]):
# ax1.plot(xwr, wr[i])
# ax5.plot(xw, w)
wh, h = signal.freqz(sinc)
angles = np.unwrap(np.angle(h))
fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, layout='constrained')
# ax1.plot(sinc)
ax1.set_title('Individual Filters')
# for i in range(sincr.shape[0]):
ax1.plot(sincr[0], label='h(0)')
ax1.plot(sincr[31], label='h(31)')
ax1.plot(sincr[63], label='h(63)')
ax1.plot(sincr[97], label='h(97)')
ax1.plot(sincr[127], label='h(127)')
ax1.legend()
# ax1.plot(sinc, marker='o')
# ax1.plot(xsincr, sincr[0])
# ax1.plot(xsincr, sincr[64])
# ax1.plot(xsincr, sincr[127])
# ax1.plot(original)
# ax2.plot(xw, sinc)
# ax2.plot(xw, sinc_test)
ax2.set_title('Original')
# ax2.plot(sinc)
ax2.plot(original[:TEST_SAMPLES // 4])
# ax2.plot(r_test3)
# ax3.plot(np.abs(fft_sinc[:fft_sinc.shape[0]//2]))
# ax3.plot(r_test)
ax3.set_title('Output')
ax3.plot(r[:r.shape[0] // 4])
# ax3.plot(r_test2)
ax4.set_title('FFT Output')
ax4.plot(np.abs(fft_r[:fft_r.shape[0]//2]))
ax4.plot(np.abs(fft_o[:fft_o.shape[0]//2]))
# ax4.plot(np.abs(fft_r_test2[:fft_r.shape[0]//2]))
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment