Skip to content

Instantly share code, notes, and snippets.

@robotsorcerer
Last active October 12, 2020 01:07
Show Gist options
  • Save robotsorcerer/27e086e530d588b46958bb030af2c6b6 to your computer and use it in GitHub Desktop.
Save robotsorcerer/27e086e530d588b46958bb030af2c6b6 to your computer and use it in GitHub Desktop.
Simple Kalman filter implementation for a single state linear system.
from __future__ import print_function
'An easy implementation of the Kalman-Bucy Filter'
'Ref: http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.361.6851&rep=rep1&type=pdf'
import rospy. # you don't have to use rospy
import numpy as np
# handle_soro_pose is a function wrapper around a ros service that retrieves sensor observations in real time.
from scripts.service_clinet import handle_soro_pose
__author__ = 'Lekan P. Molu'
__date__ = 'October 10, 2020'
__email__ = 'patlekno@icloud.com'
class SimpleKalman(object):
def __init__(self, state_dim, sigma_pro=10, sigma_obs=10):
'''
state_dim = dimension of state
sigma_pro = process noise variance, Q
sigma_obs = observation noise covariance, R
'''
self.state_dim = state_dim
self.obs = np.zeros((state_dim, 1))
self.sigma_pro = sigma_pro
self.sigma_obs = sigma_obs
self.deltaT = None
'x(k-1|k-1) as state_dimx1'
self.state_prev_on_prev = self.cur_obs # initialize priori from sensor data as well; this is x(k-1|k-1)
self.cov_prev_on_prev = np.eye(self.state_dim). # set covariance state matrix as identity; P(k-1|k-1)
@property
def cur_obs(self):
' form z/x_init matrix as 2x1 '
start = rospy.get_time() # get time in float secs, feel free to use unix wall time
resp = handle_soro_pose(). # this is data from my sensor, amend as needed
obs = np.asarray(([[resp.min, 0]])).T
self.deltaT = rospy.get_time() - start
return obs
@property
def state_transition(self):
' F matrix as nxn linear matrix '
F = np.eye(self.state_dim)
F[0][1] = self.deltaT. # only because F is 2x2 in this case
return F
@property
def process_matrix(self):
'Q: process noise as an nxn random walk Gaussian set as N(0, Q(k)); n=2 here'
W = np.asarray(([self.deltaT**2/2, self.deltaT])).T
WWT = W.dot(W.T)
return WWT * self.sigma_pro**2
@property
def obs_matrix(self):
'H matrix as nxn matrix'
return np.eye(self.state_dim)
@property
def predict(self):
' update k|k-1 variables based on k-1|k-1 '
self.obs = self.cur_obs # important to call this first cause of deltaT
F = self.state_transition # get x(k|k-1)
self.state_cur_on_prev = F.dot(self.state_prev_on_prev)
self.cov_cur_on_prev = F.dot(self.cov_prev_on_prev).dot(F.T) + self.process_matrix
@property
def update(self):
invComputeArg = self.obs_matrix.dot(self.cov_cur_on_prev).dot(self.obs_matrix.T) + np.identity(self.state_dim)*self.sigma_obs
invCompute = np.linalg.inv(invComputeArg). # if state is big, you may want to explore Cholesky factorization here
self.gain_cur = self.cov_cur_on_prev.dot(self.obs_matrix.T).dot(invCompute)
self.z = self.obs
self.state_cur_on_cur = self.state_cur_on_prev + \
self.gain_cur.dot(self.z- \
self.obs_matrix.dot(self.state_cur_on_prev))
self.cov_cur_on_cur = (np.eye(self.state_dim) - self.gain_cur.dot(self.obs_matrix)).dot(self.cov_cur_on_prev)
@property
def correct(self):
'This is the motherlode of the class; call this to correct your sensor measurements'
self.predict
self.update
# update values for next iteration
self.cov_prev_on_prev = self.cov_cur_on_cur
self.state_prev_on_prev = self.state_cur_on_cur
return self.obs_matrix.dot(self.state_cur_on_cur)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment