Last active
April 1, 2024 14:29
-
-
Save theWatchmen/ba706afc2887d2c053951537e443c3c8 to your computer and use it in GitHub Desktop.
Resampling impementation in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from scipy import signal | |
import matplotlib.pyplot as plt | |
import time | |
import math | |
Fs = 44100 | |
Fs_prime = 96000 | |
A = 90 | |
TEST_SAMPLES = 1000 | |
Nz = 13 | |
Nzd = Nz * 2 + 1 | |
# Cutoff frequency = pi / L | |
Nn = 7 | |
Neta = 6 | |
Np = Nn + Neta | |
L = 1 << Nn | |
M = (Nzd) * L | |
K = 32 | |
def beta(): | |
if A > 50: | |
return 0.1102 * (A - 8.7) | |
elif A > 21: | |
return 0.5842 * (A - 21)**(2./5.) + 0.07886 * (A - 21) | |
else: | |
return 0.0 | |
# From https://dsp.stackexchange.com/questions/37714/kaiser-window-approximation | |
def bessel_coeff(): | |
result = np.zeros(K) | |
fact = 1 | |
for k in range(K): | |
fact *= (k + 1) | |
exp = 2 ** k | |
result[k] = 1 / (fact * exp)**2 | |
return result | |
def bessel(x, bc): | |
k = K - 1 | |
x2 = x**2 | |
y = x2 * bc[k] | |
for k in range(K - 2, 0, -1): | |
y = x2 * (bc[k] + y) | |
return y | |
def window(bc): | |
m = M // 2 + 1 | |
b = beta() | |
b_beta = bessel(b, bc) | |
m2 = M//2 | |
y = np.zeros(M) | |
for n in range(m2 + 1): | |
bn = b * ( 1 - ( n / m2 )**2 )**0.5 | |
bb = bessel(bn, bc) | |
y[n + m2 - 1] = (1 / b_beta) * bb | |
y[m2 - n] = y[n + m2 - 1] | |
return y | |
def resample_window_sinc(o, r, w): | |
# Increase time by 1/Fs' | |
t_increment = 1 / Fs_prime | |
inteval_orig = 1 / Fs | |
# interval_filter = 1 / (Fs_prime / 2) | |
interval_filter = 0.5 / L | |
Lo = len(o) | |
rho = Fs_prime / Fs | |
input_increment = 1 / rho | |
dtb = int(input_increment * (1 << Np) + 0.5) | |
y = np.zeros(r.shape[0]) | |
t = 0 | |
# tb = (1 << Neta) - 1 | |
tb = 0 | |
it = 0 | |
eta = 1 | |
# l0 = 0 | |
for n in range(y.shape[0]): | |
# Assume that the filter is centered at t. We assume x[n] and x[n + 1] of the original signal are at either side | |
n0 = int(t / inteval_orig) | |
tn0 = n0 * inteval_orig | |
tn1 = (n0 + 1) * inteval_orig | |
# l0 = int(t / interval_filter) | |
# tl0 = l0 * interval_filter | |
# tl1 = (l0 + 1) * interval_filter | |
# eta0 = abs( tn0 - tl0 ) / inteval_orig | |
lt = it * interval_filter | |
l0 = int(lt) + 97 | |
# eta0 = abs(tl0 - tn0) / interval_filter | |
eta0 = 1 - ((t - tn0) / (inteval_orig)) | |
# eta0 = 1 - ((t - tn0) / (interval_filter)) | |
# eta0 = 1 - it if it == 0.0 or it == 1.0 else (1 - it) % 1.0 | |
eta1 = 1 - eta0 | |
v = 0 | |
nn = (l0) % L | |
# nn = (n + 64) % L | |
# nn = (n0 + 64) % L | |
if True: | |
for z in range(-Nz, Nz + 1): | |
ni = n0 + z | |
if ni < 0 or ni >= Lo: | |
continue | |
# eta = eta0 if z <=0 else eta1 | |
# h_hat = eta * (w[nn][z + Nz] - w[nn][z]) | |
# v += o[ni] * ( w[nn][z + Nz] + h_hat ) | |
v += o[ni] * eta0 * w[nn][z + Nz] | |
nn = (l0 + 1) % L | |
# nn = (n + 1 + 64) % L | |
# nn = (n0 + 1 + 64) % L | |
eta = 1 - eta | |
for z in range(-Nz, Nz + 1): | |
ni = n0 + 1 + z | |
if ni < 0 or ni >= Lo: | |
continue | |
# eta = eta0 if z <=0 else eta1 | |
# h_hat = eta * (w[nn][z + Nz] - w[nn][z]) | |
# v += o[ni] * ( w[nn][z + Nz] + h_hat ) | |
v += o[ni] * eta1 * w[nn][z + Nz] | |
if False: | |
tb = tb & ((1 << Np) - 1) | |
l0 = tb >> Neta | |
eta = (tb & ((1 << Neta) - 1)) / ((1 << Neta) - 1) | |
for z in range(-Nz, Nz + 1): | |
ni = n0 + z | |
if ni < 0 or ni >= Lo: | |
continue | |
etai = eta if z <=0 else (1 - eta) | |
nni = nn if z <= 0 else (nn + 1) % L | |
w_hat = (w[nni][z + Nz + 1] - w[nni][z + Nz]) if z + Nz + 1 < (Nz * 2 + 1) else 0 | |
h_hat = etai * w_hat | |
v += o[ni] * ( w[nni][z + Nz] + h_hat ) | |
y[n] = v | |
eta = (eta + (1 / rho)) % 1.0 | |
# l0 += 2 | |
t += t_increment | |
tb += dtb | |
it += input_increment | |
return y | |
def resample_box(o, r): | |
t_increment = 1 / Fs_prime | |
inteval_orig = 1 / Fs | |
y = np.zeros(r.shape[0]) | |
t = 0 | |
for n in range(y.shape[0]): | |
n0 = int(t / inteval_orig) | |
ot = n0 * inteval_orig | |
eta = (t - ot) / inteval_orig | |
if eta <= 0.5: | |
y[n] = o[n0] | |
else: | |
new_n = min(len(o) - 1, n0 + 1) | |
y[n] = o[new_n] | |
t += t_increment | |
return y | |
if __name__ == '__main__': | |
bc = bessel_coeff() | |
x = np.array(np.arange(-16, 16, 1/M)) | |
y = np.zeros(x.shape) | |
for i in range(y.shape[0]): | |
y[i] = bessel(x[i], bc) | |
w = window(bc) | |
# w_test = signal.get_window(('kaiser', beta()), Nzd) | |
xw = np.array(range(-M//2, M//2)) | |
scale = min(1, Fs_prime/Fs) | |
omega = min(Fs,Fs_prime) | |
# x_sinc = np.arange(0, np.pi * Nz, np.pi / L) | |
# sinc = [1 if x == 0 else np.sin(x) / (x) for x in x_sinc] | |
fc = 0.5 / L | |
# sinc = [np.sin(2*np.pi * fc * i) / i if i != 0 else np.sin(2*np.pi * fc) for i in range(-M//2, M//2)] | |
sinc = [np.sin(2*np.pi * fc * i) / (2*np.pi * fc * i) if i != 0 else 1 for i in range(-M//2, M//2)] | |
sinc = [w[i] * sinc[i] for i in range(M)] | |
# sinc_sum = np.sum(sinc) | |
# sinc = sinc / sinc_sum | |
# sinc *= L | |
sinc = [0]*34 + sinc # TODO(marco): compute correct number of zeros | |
sinc = np.array(sinc) | |
fc_test = (Fs_prime / 2) / L | |
sinc_test = signal.firwin(M, fc_test, window=('kaiser', beta()), fs=Fs_prime) | |
sinc_test *= L | |
sinc_testr = sinc_test.reshape(Nzd, L).transpose() | |
sinc_test = np.concatenate([np.zeros(34), sinc_test]) | |
g_ = math.gcd(Fs_prime, Fs) | |
up = Fs_prime // g_ | |
down = Fs // g_ | |
max_rate = max(up, down) | |
f_c = 1. / max_rate # cutoff of FIR filter (rel. to Nyquist) | |
half_len = 10 * max_rate # reasonable cutoff for sinc-like function | |
r_test3 = signal.firwin(2 * half_len + 1, f_c, window=('kaiser', beta())) | |
r_test3 *= up | |
r_test3 = np.concatenate([np.zeros(34), r_test3]) | |
# sincr = sinc.reshape(Nzd, L).transpose() | |
sincr = np.zeros((L, Nzd)) | |
for l in range(L): | |
for i in range(Nzd): | |
sincr[l][i] = sinc[L*i + l] | |
xsincr = np.array(range(-Nz, Nz + 1)) | |
wr = w.reshape(Nzd, L).transpose() | |
xwr = np.array(range(-Nz, Nz + 1)) | |
o_count = TEST_SAMPLES | |
xo = np.array(np.arange(0, 1, 1/o_count)) | |
original = np.sin(400 * np.pi * 2 * xo) | |
r_count = TEST_SAMPLES * Fs_prime/Fs | |
xr = np.array(np.arange(0, 1, 1/r_count)) | |
r = resample_window_sinc(original, xr, sincr) | |
r_test = resample_window_sinc(original, xr, sinc_testr) | |
# r = resample_box(original, xr) | |
# r_test2 = signal.resample_poly(original, 960, 441, window=('kaiser', beta())) | |
r_test2 = signal.upfirdn(r_test3, original, up, down) | |
fft_o = np.fft.fft(original) | |
fft_r = np.fft.fft(r) | |
fft_r_test2 = np.fft.fft(r_test2) | |
fft_sinc = np.fft.fft(sinc) | |
# fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, layout='constrained') | |
# ax1.plot(xo, original) | |
# ax2.plot(np.abs(fft_o[:fft_o.shape[0]//2])) | |
# ax3.plot(xr, r) | |
# ax4.plot(np.abs(fft_r[:fft_r.shape[0]//2])) | |
# for i in range(xwr.shape[0]): | |
# ax1.plot(xwr, wr[i]) | |
# ax5.plot(xw, w) | |
wh, h = signal.freqz(sinc) | |
angles = np.unwrap(np.angle(h)) | |
fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, layout='constrained') | |
# ax1.plot(sinc) | |
ax1.set_title('Individual Filters') | |
# for i in range(sincr.shape[0]): | |
ax1.plot(sincr[0], label='h(0)') | |
ax1.plot(sincr[31], label='h(31)') | |
ax1.plot(sincr[63], label='h(63)') | |
ax1.plot(sincr[97], label='h(97)') | |
ax1.plot(sincr[127], label='h(127)') | |
ax1.legend() | |
# ax1.plot(sinc, marker='o') | |
# ax1.plot(xsincr, sincr[0]) | |
# ax1.plot(xsincr, sincr[64]) | |
# ax1.plot(xsincr, sincr[127]) | |
# ax1.plot(original) | |
# ax2.plot(xw, sinc) | |
# ax2.plot(xw, sinc_test) | |
ax2.set_title('Original') | |
# ax2.plot(sinc) | |
ax2.plot(original[:TEST_SAMPLES // 4]) | |
# ax2.plot(r_test3) | |
# ax3.plot(np.abs(fft_sinc[:fft_sinc.shape[0]//2])) | |
# ax3.plot(r_test) | |
ax3.set_title('Output') | |
ax3.plot(r[:r.shape[0] // 4]) | |
# ax3.plot(r_test2) | |
ax4.set_title('FFT Output') | |
ax4.plot(np.abs(fft_r[:fft_r.shape[0]//2])) | |
ax4.plot(np.abs(fft_o[:fft_o.shape[0]//2])) | |
# ax4.plot(np.abs(fft_r_test2[:fft_r.shape[0]//2])) | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment