Last active
October 12, 2020 01:07
-
-
Save robotsorcerer/27e086e530d588b46958bb030af2c6b6 to your computer and use it in GitHub Desktop.
Simple Kalman filter implementation for a single state linear system.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
'An easy implementation of the Kalman-Bucy Filter' | |
'Ref: http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.361.6851&rep=rep1&type=pdf' | |
import rospy. # you don't have to use rospy | |
import numpy as np | |
# handle_soro_pose is a function wrapper around a ros service that retrieves sensor observations in real time. | |
from scripts.service_clinet import handle_soro_pose | |
__author__ = 'Lekan P. Molu' | |
__date__ = 'October 10, 2020' | |
__email__ = 'patlekno@icloud.com' | |
class SimpleKalman(object): | |
def __init__(self, state_dim, sigma_pro=10, sigma_obs=10): | |
''' | |
state_dim = dimension of state | |
sigma_pro = process noise variance, Q | |
sigma_obs = observation noise covariance, R | |
''' | |
self.state_dim = state_dim | |
self.obs = np.zeros((state_dim, 1)) | |
self.sigma_pro = sigma_pro | |
self.sigma_obs = sigma_obs | |
self.deltaT = None | |
'x(k-1|k-1) as state_dimx1' | |
self.state_prev_on_prev = self.cur_obs # initialize priori from sensor data as well; this is x(k-1|k-1) | |
self.cov_prev_on_prev = np.eye(self.state_dim). # set covariance state matrix as identity; P(k-1|k-1) | |
@property | |
def cur_obs(self): | |
' form z/x_init matrix as 2x1 ' | |
start = rospy.get_time() # get time in float secs, feel free to use unix wall time | |
resp = handle_soro_pose(). # this is data from my sensor, amend as needed | |
obs = np.asarray(([[resp.min, 0]])).T | |
self.deltaT = rospy.get_time() - start | |
return obs | |
@property | |
def state_transition(self): | |
' F matrix as nxn linear matrix ' | |
F = np.eye(self.state_dim) | |
F[0][1] = self.deltaT. # only because F is 2x2 in this case | |
return F | |
@property | |
def process_matrix(self): | |
'Q: process noise as an nxn random walk Gaussian set as N(0, Q(k)); n=2 here' | |
W = np.asarray(([self.deltaT**2/2, self.deltaT])).T | |
WWT = W.dot(W.T) | |
return WWT * self.sigma_pro**2 | |
@property | |
def obs_matrix(self): | |
'H matrix as nxn matrix' | |
return np.eye(self.state_dim) | |
@property | |
def predict(self): | |
' update k|k-1 variables based on k-1|k-1 ' | |
self.obs = self.cur_obs # important to call this first cause of deltaT | |
F = self.state_transition # get x(k|k-1) | |
self.state_cur_on_prev = F.dot(self.state_prev_on_prev) | |
self.cov_cur_on_prev = F.dot(self.cov_prev_on_prev).dot(F.T) + self.process_matrix | |
@property | |
def update(self): | |
invComputeArg = self.obs_matrix.dot(self.cov_cur_on_prev).dot(self.obs_matrix.T) + np.identity(self.state_dim)*self.sigma_obs | |
invCompute = np.linalg.inv(invComputeArg). # if state is big, you may want to explore Cholesky factorization here | |
self.gain_cur = self.cov_cur_on_prev.dot(self.obs_matrix.T).dot(invCompute) | |
self.z = self.obs | |
self.state_cur_on_cur = self.state_cur_on_prev + \ | |
self.gain_cur.dot(self.z- \ | |
self.obs_matrix.dot(self.state_cur_on_prev)) | |
self.cov_cur_on_cur = (np.eye(self.state_dim) - self.gain_cur.dot(self.obs_matrix)).dot(self.cov_cur_on_prev) | |
@property | |
def correct(self): | |
'This is the motherlode of the class; call this to correct your sensor measurements' | |
self.predict | |
self.update | |
# update values for next iteration | |
self.cov_prev_on_prev = self.cov_cur_on_cur | |
self.state_prev_on_prev = self.state_cur_on_cur | |
return self.obs_matrix.dot(self.state_cur_on_cur) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment