Created
May 20, 2019 20:38
-
-
Save rosek86/49cbb415bbcc3fddc973e466e5af76f3 to your computer and use it in GitHub Desktop.
IIR filter design using tensorflow JS - concept
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as tf from '@tensorflow/tfjs-node'; | |
import { promises as fs } from 'fs'; | |
function buttap(N: number): { z: tf.Tensor<tf.Rank>, p: tf.Tensor<tf.Rank>, k: tf.Tensor<tf.Rank> } { | |
/* | |
Return (z,p,k) for analog prototype of Nth-order Butterworth filter. | |
The filter will have an angular (e.g. rad/s) cutoff frequency of 1. | |
See Also | |
-------- | |
butter : Filter design function using this prototype | |
*/ | |
if (Math.abs(Math.round(N)) !== N) { | |
throw new Error('Filter order must be a nonnegative integer'); | |
} | |
const z = tf.tensor([]); | |
const m = tf.range(-N+1, N, 2).mul(Math.PI / (2.0 * N)); | |
// Middle value is 0 to ensure an exactly real pole | |
const p = tf.exp(tf.complex(tf.zeros([ m.size ]), m)).neg(); | |
const k = tf.scalar(1); | |
return { z, p, k }; | |
} | |
function _relative_degree(z: tf.Tensor<tf.Rank>, p: tf.Tensor<tf.Rank>): number { | |
// Return relative degree of transfer function from zeros and poles | |
const degree = p.size - z.size; | |
if (degree < 0) { | |
throw new Error('Improper transfer function. Must have at least as many poles as zeros.'); | |
} | |
return degree | |
} | |
function lp2lp_zpk(z: tf.Tensor<tf.Rank>, p: tf.Tensor<tf.Rank>, k: tf.Tensor<tf.Rank>, | |
wo: tf.Tensor<tf.Rank> = tf.scalar(1.0)): { | |
z: tf.Tensor<tf.Rank>, p: tf.Tensor<tf.Rank>, k: tf.Tensor<tf.Rank> | |
} { | |
/* | |
Transform a lowpass filter prototype to a different frequency. | |
Return an analog low-pass filter with cutoff frequency `wo` | |
from an analog low-pass filter prototype with unity cutoff frequency, | |
using zeros, poles, and gain ('zpk') representation. | |
Parameters | |
---------- | |
z : array_like | |
Zeros of the analog filter transfer function. | |
p : array_like | |
Poles of the analog filter transfer function. | |
k : float | |
System gain of the analog filter transfer function. | |
wo : float | |
Desired cutoff, as angular frequency (e.g. rad/s). | |
Defaults to no change. | |
Returns | |
------- | |
z : ndarray | |
Zeros of the transformed low-pass filter transfer function. | |
p : ndarray | |
Poles of the transformed low-pass filter transfer function. | |
k : float | |
System gain of the transformed low-pass filter. | |
See Also | |
-------- | |
lp2hp_zpk, lp2bp_zpk, lp2bs_zpk, bilinear | |
lp2lp | |
Notes | |
----- | |
This is derived from the s-plane substitution | |
.. math:: s \rightarrow \frac{s}{\omega_0} | |
.. versionadded:: 1.1.0 | |
*/ | |
const degree = _relative_degree(z, p); | |
// Scale all points radially from origin to shift cutoff frequency | |
z = wo.mul(z); | |
p = wo.mul(p); | |
// Each shifted pole decreases gain by wo, each shifted zero increases it. | |
// Cancel out the net change to keep overall gain the same | |
k = k.mul(wo.pow(degree)); | |
return { z, p, k }; | |
} | |
function bilinear_zpk(z: tf.Tensor<tf.Rank>, p: tf.Tensor<tf.Rank>, k: tf.Tensor<tf.Rank>, fs: number) { | |
/* | |
Return a digital IIR filter from an analog one using a bilinear transform. | |
Transform a set of poles and zeros from the analog s-plane to the digital | |
z-plane using Tustin's method, which substitutes ``(z-1) / (z+1)`` for | |
``s``, maintaining the shape of the frequency response. | |
Parameters | |
---------- | |
z : array_like | |
Zeros of the analog filter transfer function. | |
p : array_like | |
Poles of the analog filter transfer function. | |
k : float | |
System gain of the analog filter transfer function. | |
fs : float | |
Sample rate, as ordinary frequency (e.g. hertz). No prewarping is | |
done in this function. | |
Returns | |
------- | |
z : ndarray | |
Zeros of the transformed digital filter transfer function. | |
p : ndarray | |
Poles of the transformed digital filter transfer function. | |
k : float | |
System gain of the transformed digital filter. | |
See Also | |
-------- | |
lp2lp_zpk, lp2hp_zpk, lp2bp_zpk, lp2bs_zpk | |
bilinear | |
Notes | |
----- | |
.. versionadded:: 1.1.0 | |
Examples | |
-------- | |
>>> from scipy import signal | |
>>> import matplotlib.pyplot as plt | |
>>> fs = 100 | |
>>> bf = 2 * np.pi * np.array([7, 13]) | |
>>> filts = signal.lti(*signal.butter(4, bf, btype='bandpass', analog=True, output='zpk')) | |
>>> filtz = signal.lti(*signal.bilinear_zpk(filts.zeros, filts.poles, filts.gain, fs)) | |
>>> wz, hz = signal.freqz_zpk(filtz.zeros, filtz.poles, filtz.gain) | |
>>> ws, hs = signal.freqs_zpk(filts.zeros, filts.poles, filts.gain, worN=fs*wz) | |
>>> plt.semilogx(wz*fs/(2*np.pi), 20*np.log10(np.abs(hz).clip(1e-15)), label=r'$|H(j \omega)|$') | |
>>> plt.semilogx(wz*fs/(2*np.pi), 20*np.log10(np.abs(hs).clip(1e-15)), label=r'$|H_z(e^{j \omega})|$') | |
>>> plt.legend() | |
>>> plt.xlabel('Frequency [Hz]') | |
>>> plt.ylabel('Magnitude [dB]') | |
>>> plt.grid() | |
*/ | |
const degree = _relative_degree(z, p); | |
const fs2 = tf.scalar(2.0 * fs); | |
// Bilinear transform the poles and zeros | |
let z_z = tf.add(fs2, z).div(tf.sub(fs2, z)); | |
let p_z = tf.add(fs2, p).div(tf.sub(fs2, p)); | |
// Any zeros that were at infinity get moved to the Nyquist frequency | |
z_z = z_z.concat(tf.ones([ degree ]).neg()); | |
// Compensate for gain change | |
const k_z = k.mul( | |
tf.real( | |
tf.div( | |
tf.prod(tf.sub(fs2, z)), | |
tf.prod(tf.sub(fs2, p)) | |
) | |
) | |
); | |
return { z: z_z, p: p_z, k: k_z }; | |
} | |
function poly(roots: tf.Tensor): tf.Tensor { | |
const n = roots.size; | |
const r = tf.split(roots, n); | |
// Declare an array for polynomial coefficient. | |
const coeffs: tf.Tensor[] = [...new Array(n + 1)].map(() => | |
tf.complex([ 0 ], [ 0 ]) | |
); | |
// Set highest order coefficient as 1 | |
coeffs[n] = tf.complex([ 1 ], [ 0 ]); | |
for (let i = 1; i <= n; i++) { | |
for (let j = n - i; j < n; j++) { | |
const x = tf.scalar(-1).mul(r[i - 1]).mul(coeffs[j + 1]); | |
coeffs[j] = tf.add(coeffs[j], x); | |
} | |
} | |
return tf.concat(coeffs.reverse()); | |
} | |
function iirfilter(N: number, Wn: number|number[], opts: { | |
rp: any, rs: any, btype: any, analog: any, | |
ftype: any, output: any, fs: any, | |
} = { | |
rp: null, rs: null, btype: 'band', | |
analog: false, ftype: 'butter', | |
output: 'ba', fs: null | |
}) { | |
Wn = Array.isArray(Wn) ? Wn : [ Wn ]; | |
if (Wn.some((x) => x <= 0 || x >= 1)) { | |
throw new Error('Digital filter critical frequencies must be 0 < Wn < 1'); | |
} | |
let { z, p, k } = buttap(N); | |
const fs = 2.0 | |
// 2 * fs * tan(pi * Wn / fs) | |
const warped = tf.scalar(2 * fs).mul( | |
tf.tan( | |
tf.scalar(Math.PI).mul( | |
tf.tensor1d(Wn) | |
).div(fs) | |
) | |
); | |
({ z, p, k } = lp2lp_zpk(z, p, k, warped)); | |
({ z, p, k } = bilinear_zpk(z, p, k, fs)); | |
return zpk2tf(z, p, k); | |
} | |
function zpk2tf(z: tf.Tensor, p: tf.Tensor, k: tf.Tensor): { b: tf.Tensor, a: tf.Tensor } { | |
const b = tf.real(k.mul(poly(z))); | |
const a = tf.real(poly(p)); | |
return { b, a }; | |
} | |
async function test() { | |
const samplingFreq = 50 | |
const cutoff = 1 | |
const order = 6 | |
const nyq = 0.5 * samplingFreq | |
const normal_cutoff = cutoff / nyq | |
const { b, a } = tf.tidy(() => { | |
const { b, a } = iirfilter(order, normal_cutoff); | |
return { b: b.dataSync(), a: a.dataSync() }; | |
}); | |
const data = tf.data.csv('file://out.csv'); | |
const laser1Dataset = await data.mapAsync<number>((x) => (x as any)['1']); | |
const laser1 = await laser1Dataset.toArray(); | |
const result = []; | |
const ys: number[] = [...new Array(b.length)].map(() => 2500); | |
const xs: number[] = [...new Array(a.length)].map(() => 2500); | |
for (const v of laser1) { | |
xs.unshift(v); xs.pop(); | |
ys.unshift(0); ys.pop(); | |
let y = 0; | |
for (let i = 0; i < xs.length; i++) { | |
y += xs[i] * b[i]; | |
y -= ys[i] * a[i]; | |
} | |
ys[0] = y; | |
result.push(y); | |
} | |
const resultSorted = result.slice().sort(); | |
const median = resultSorted[resultSorted.length/2]; | |
const min = resultSorted[0]; | |
console.log(median, min); | |
let csv = ''; | |
for (const v of result) { | |
csv += `${v}\r\n`; | |
} | |
await fs.writeFile('filt.csv', csv); | |
console.log(tf.getBackend()); | |
console.log(tf.memory()); | |
} | |
test(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment