Skip to content

Instantly share code, notes, and snippets.

@rosek86
Created May 20, 2019 20:38
Show Gist options
  • Save rosek86/49cbb415bbcc3fddc973e466e5af76f3 to your computer and use it in GitHub Desktop.
Save rosek86/49cbb415bbcc3fddc973e466e5af76f3 to your computer and use it in GitHub Desktop.
IIR filter design using tensorflow JS - concept
import * as tf from '@tensorflow/tfjs-node';
import { promises as fs } from 'fs';
function buttap(N: number): { z: tf.Tensor<tf.Rank>, p: tf.Tensor<tf.Rank>, k: tf.Tensor<tf.Rank> } {
/*
Return (z,p,k) for analog prototype of Nth-order Butterworth filter.
The filter will have an angular (e.g. rad/s) cutoff frequency of 1.
See Also
--------
butter : Filter design function using this prototype
*/
if (Math.abs(Math.round(N)) !== N) {
throw new Error('Filter order must be a nonnegative integer');
}
const z = tf.tensor([]);
const m = tf.range(-N+1, N, 2).mul(Math.PI / (2.0 * N));
// Middle value is 0 to ensure an exactly real pole
const p = tf.exp(tf.complex(tf.zeros([ m.size ]), m)).neg();
const k = tf.scalar(1);
return { z, p, k };
}
function _relative_degree(z: tf.Tensor<tf.Rank>, p: tf.Tensor<tf.Rank>): number {
// Return relative degree of transfer function from zeros and poles
const degree = p.size - z.size;
if (degree < 0) {
throw new Error('Improper transfer function. Must have at least as many poles as zeros.');
}
return degree
}
function lp2lp_zpk(z: tf.Tensor<tf.Rank>, p: tf.Tensor<tf.Rank>, k: tf.Tensor<tf.Rank>,
wo: tf.Tensor<tf.Rank> = tf.scalar(1.0)): {
z: tf.Tensor<tf.Rank>, p: tf.Tensor<tf.Rank>, k: tf.Tensor<tf.Rank>
} {
/*
Transform a lowpass filter prototype to a different frequency.
Return an analog low-pass filter with cutoff frequency `wo`
from an analog low-pass filter prototype with unity cutoff frequency,
using zeros, poles, and gain ('zpk') representation.
Parameters
----------
z : array_like
Zeros of the analog filter transfer function.
p : array_like
Poles of the analog filter transfer function.
k : float
System gain of the analog filter transfer function.
wo : float
Desired cutoff, as angular frequency (e.g. rad/s).
Defaults to no change.
Returns
-------
z : ndarray
Zeros of the transformed low-pass filter transfer function.
p : ndarray
Poles of the transformed low-pass filter transfer function.
k : float
System gain of the transformed low-pass filter.
See Also
--------
lp2hp_zpk, lp2bp_zpk, lp2bs_zpk, bilinear
lp2lp
Notes
-----
This is derived from the s-plane substitution
.. math:: s \rightarrow \frac{s}{\omega_0}
.. versionadded:: 1.1.0
*/
const degree = _relative_degree(z, p);
// Scale all points radially from origin to shift cutoff frequency
z = wo.mul(z);
p = wo.mul(p);
// Each shifted pole decreases gain by wo, each shifted zero increases it.
// Cancel out the net change to keep overall gain the same
k = k.mul(wo.pow(degree));
return { z, p, k };
}
function bilinear_zpk(z: tf.Tensor<tf.Rank>, p: tf.Tensor<tf.Rank>, k: tf.Tensor<tf.Rank>, fs: number) {
/*
Return a digital IIR filter from an analog one using a bilinear transform.
Transform a set of poles and zeros from the analog s-plane to the digital
z-plane using Tustin's method, which substitutes ``(z-1) / (z+1)`` for
``s``, maintaining the shape of the frequency response.
Parameters
----------
z : array_like
Zeros of the analog filter transfer function.
p : array_like
Poles of the analog filter transfer function.
k : float
System gain of the analog filter transfer function.
fs : float
Sample rate, as ordinary frequency (e.g. hertz). No prewarping is
done in this function.
Returns
-------
z : ndarray
Zeros of the transformed digital filter transfer function.
p : ndarray
Poles of the transformed digital filter transfer function.
k : float
System gain of the transformed digital filter.
See Also
--------
lp2lp_zpk, lp2hp_zpk, lp2bp_zpk, lp2bs_zpk
bilinear
Notes
-----
.. versionadded:: 1.1.0
Examples
--------
>>> from scipy import signal
>>> import matplotlib.pyplot as plt
>>> fs = 100
>>> bf = 2 * np.pi * np.array([7, 13])
>>> filts = signal.lti(*signal.butter(4, bf, btype='bandpass', analog=True, output='zpk'))
>>> filtz = signal.lti(*signal.bilinear_zpk(filts.zeros, filts.poles, filts.gain, fs))
>>> wz, hz = signal.freqz_zpk(filtz.zeros, filtz.poles, filtz.gain)
>>> ws, hs = signal.freqs_zpk(filts.zeros, filts.poles, filts.gain, worN=fs*wz)
>>> plt.semilogx(wz*fs/(2*np.pi), 20*np.log10(np.abs(hz).clip(1e-15)), label=r'$|H(j \omega)|$')
>>> plt.semilogx(wz*fs/(2*np.pi), 20*np.log10(np.abs(hs).clip(1e-15)), label=r'$|H_z(e^{j \omega})|$')
>>> plt.legend()
>>> plt.xlabel('Frequency [Hz]')
>>> plt.ylabel('Magnitude [dB]')
>>> plt.grid()
*/
const degree = _relative_degree(z, p);
const fs2 = tf.scalar(2.0 * fs);
// Bilinear transform the poles and zeros
let z_z = tf.add(fs2, z).div(tf.sub(fs2, z));
let p_z = tf.add(fs2, p).div(tf.sub(fs2, p));
// Any zeros that were at infinity get moved to the Nyquist frequency
z_z = z_z.concat(tf.ones([ degree ]).neg());
// Compensate for gain change
const k_z = k.mul(
tf.real(
tf.div(
tf.prod(tf.sub(fs2, z)),
tf.prod(tf.sub(fs2, p))
)
)
);
return { z: z_z, p: p_z, k: k_z };
}
function poly(roots: tf.Tensor): tf.Tensor {
const n = roots.size;
const r = tf.split(roots, n);
// Declare an array for polynomial coefficient.
const coeffs: tf.Tensor[] = [...new Array(n + 1)].map(() =>
tf.complex([ 0 ], [ 0 ])
);
// Set highest order coefficient as 1
coeffs[n] = tf.complex([ 1 ], [ 0 ]);
for (let i = 1; i <= n; i++) {
for (let j = n - i; j < n; j++) {
const x = tf.scalar(-1).mul(r[i - 1]).mul(coeffs[j + 1]);
coeffs[j] = tf.add(coeffs[j], x);
}
}
return tf.concat(coeffs.reverse());
}
function iirfilter(N: number, Wn: number|number[], opts: {
rp: any, rs: any, btype: any, analog: any,
ftype: any, output: any, fs: any,
} = {
rp: null, rs: null, btype: 'band',
analog: false, ftype: 'butter',
output: 'ba', fs: null
}) {
Wn = Array.isArray(Wn) ? Wn : [ Wn ];
if (Wn.some((x) => x <= 0 || x >= 1)) {
throw new Error('Digital filter critical frequencies must be 0 < Wn < 1');
}
let { z, p, k } = buttap(N);
const fs = 2.0
// 2 * fs * tan(pi * Wn / fs)
const warped = tf.scalar(2 * fs).mul(
tf.tan(
tf.scalar(Math.PI).mul(
tf.tensor1d(Wn)
).div(fs)
)
);
({ z, p, k } = lp2lp_zpk(z, p, k, warped));
({ z, p, k } = bilinear_zpk(z, p, k, fs));
return zpk2tf(z, p, k);
}
function zpk2tf(z: tf.Tensor, p: tf.Tensor, k: tf.Tensor): { b: tf.Tensor, a: tf.Tensor } {
const b = tf.real(k.mul(poly(z)));
const a = tf.real(poly(p));
return { b, a };
}
async function test() {
const samplingFreq = 50
const cutoff = 1
const order = 6
const nyq = 0.5 * samplingFreq
const normal_cutoff = cutoff / nyq
const { b, a } = tf.tidy(() => {
const { b, a } = iirfilter(order, normal_cutoff);
return { b: b.dataSync(), a: a.dataSync() };
});
const data = tf.data.csv('file://out.csv');
const laser1Dataset = await data.mapAsync<number>((x) => (x as any)['1']);
const laser1 = await laser1Dataset.toArray();
const result = [];
const ys: number[] = [...new Array(b.length)].map(() => 2500);
const xs: number[] = [...new Array(a.length)].map(() => 2500);
for (const v of laser1) {
xs.unshift(v); xs.pop();
ys.unshift(0); ys.pop();
let y = 0;
for (let i = 0; i < xs.length; i++) {
y += xs[i] * b[i];
y -= ys[i] * a[i];
}
ys[0] = y;
result.push(y);
}
const resultSorted = result.slice().sort();
const median = resultSorted[resultSorted.length/2];
const min = resultSorted[0];
console.log(median, min);
let csv = '';
for (const v of result) {
csv += `${v}\r\n`;
}
await fs.writeFile('filt.csv', csv);
console.log(tf.getBackend());
console.log(tf.memory());
}
test();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment