Last active
July 15, 2024 23:40
-
-
Save swang225/1cadaf1759561902b3ccc023987a65c9 to your computer and use it in GitHub Desktop.
Kalman Filter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pandas as pd | |
# dropping tennis ball | |
def problem1(): | |
dt = 1 | |
F = np.matrix([[1, dt, 0], | |
[0, 1, dt], | |
[0, 0, 1]]) | |
G = np.matrix([[dt ** 2 / 2], | |
[dt], | |
[1]]) | |
sa = 1 | |
Q = G.dot(G.transpose()) * (sa ** 2) | |
H = np.matrix([1, 0, 0]) | |
# estimate | |
sz = 1 | |
R = sz ** 2 | |
# initial values | |
x0 = np.matrix([[20000], | |
[0], | |
[-9.81]]) # gravity | |
vc0 = np.matrix([[0, 0, 0], | |
[0, 0, 0], | |
[0, 0, 0]]) | |
return sa, sz, F, G, Q, H, R, x0, vc0 | |
def xk(F, xk1, G, sa): | |
_xk = F.dot(xk1) + G * np.random.normal(loc=0, scale=sa) | |
return _xk | |
def zk(xk, H, sz): | |
_zk = H.dot(xk) + np.random.normal(loc=0, scale=sz) | |
return _zk | |
def pxk_k1(F, pxk1): | |
''' predict x(k) given predicted x(k-1)''' | |
_pxk = F.dot(pxk1) | |
return _pxk | |
def pvck_k1(F, pvck1, Q): | |
''' predict vc(k) given predicted vc(k-1)''' | |
_pvck = F.dot(pvck1.dot(F.transpose())) + Q | |
return _pvck | |
def gaink(pvck, H, R): | |
# scale applied to gain, if noise is high, gain smaller | |
S = H.dot(pvck.dot(H.transpose())) + R | |
def inverse(A): | |
if isinstance(A, np.matrix): | |
return np.linalg.inv(A) | |
return 1/A | |
# calc gain | |
_gk = pvck.dot(H.transpose().dot(inverse(S))) | |
return _gk | |
def uxk(pxk, H, zk, gk): | |
_yk = zk - H.dot(pxk) | |
_uxk = pxk + gk.dot(_yk) | |
_uyk = zk - H.dot(_uxk) | |
return _uxk, _uyk | |
def uvck(pvck, gk, H): | |
_uvck = (np.identity(len(pvck)) - gk.dot(H)).dot(pvck) | |
return _uvck | |
def test001(problem): | |
sa, sz, F, G, Q, H, R, x0, vc0 = problem() | |
# generate experiment | |
x = [x0] | |
z = [H.dot(x0)] | |
t_range = 100 | |
for i in range(1, t_range + 1): | |
_xk = xk(F, x[-1], G, sa) | |
if _xk[0].item() < 0: | |
t_range = i - 1 | |
break | |
x.append(_xk) | |
z.append(zk(x[-1], H, sz)) | |
df_x = pd.DataFrame(data={'t': range(t_range + 1), | |
'x': [v[0].item() for v in x]}).set_index('t') | |
df_z = pd.DataFrame(data={'t': range(t_range + 1), | |
'z': [v.item() if v is not None else None for v in z]}).set_index('t') | |
# for each observation frequency, generate prediction | |
freqs = [5, 10, 50] | |
df_pxs = [] | |
for freq in freqs: | |
pvc = [vc0] # predicted vc matrix | |
px = [x0] # predicted position | |
uvc = [vc0] # updated vc matrix | |
ux = [x0] # updated position | |
uy = [np.matrix(0)] # updated difference between prediction and observation | |
for i in range(1, t_range + 1): | |
_pxk = pxk_k1(F, ux[-1]) | |
px.append(_pxk) | |
_pvck = pvck_k1(F, uvc[-1], Q) | |
pvc.append(_pvck) | |
_gk = gaink(_pvck, H, R) | |
if i % freq != 0: | |
ux.append(_pxk) | |
uy.append(None) | |
uvc.append(_pvck) | |
continue | |
_uxk, _uyk = uxk(_pxk, H, z[i], _gk) | |
ux.append(_uxk) | |
uy.append(_uyk) | |
_uvck = uvck(_pvck, _gk, H) | |
uvc.append(_uvck) | |
df_px = pd.DataFrame(data={'t': range(t_range + 1), | |
'predicted ' + str(freq): [v[0].item() for v in px]}).set_index('t') | |
df_pxs.append(df_px) | |
df = pd.concat([df_x, df_z] + df_pxs, axis=1) | |
return df | |
df = test001(problem1) | |
print(df) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment