Skip to content

Instantly share code, notes, and snippets.

@y8
Last active August 29, 2015 14:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save y8/d211a0b48002e2941c1f to your computer and use it in GitHub Desktop.
Save y8/d211a0b48002e2941c1f to your computer and use it in GitHub Desktop.
Kalman filter: benchmark
require 'benchmark/ips'
require "matrix"
require "time"
class Kalman
attr_accessor :state
attr_accessor :process_noise
attr_reader :control
attr_accessor :model
attr_reader :readings
attr_reader :sensor_noise
attr_reader :identity
attr_accessor :error
attr_accessor :system_uncertainty
attr_accessor :kalman_gain
attr_accessor :current_measurement
attr_accessor :current_timestep
attr_accessor :update_time, :predict_time
def initialize
# x = Estimate value (intial state)
@state = Matrix[
[247.0], # Height,
[0.0], # Velocity
[0.0] # Acceleration
]
# P = Initial uncertainty (process noise)
@process_noise = Matrix[
[100, 0.0, 0.0], # We're not certian about height
[0.0, 100, 0.0], # We're not certain about velocity
[0.0, 0.0, 100], # We're not certain about acceleration
]
# u = External motion (control signal)
# There no control signal in our case, so it's just zeroes
@control = Matrix[
[0.0], # Height control signal, none
[0.0], # Velocity control signal, none
[0.0], # Acceleration control signal, none
]
# H = Measurement function (measured params)
@readings = Matrix[[
1.0, # We can measure height
0.0, # But we can't measure velocity
0.0 # Not acceleration
]].freeze
# R = Measurement uncertainty (measurement noise)
@sensor_noise = Matrix[
[0.6] # We're sensing only height, and there some error associated with height
]
# z = Current measurement (observed value)
@current_measurement = Matrix[
[0.0]
]
# y = Error
@error = Matrix[
[0.0]
]
# K = Kalman gain (current measurement uncertainty)
@kalman_gain = Matrix[
[0.0],
[0.0],
[0.0]
]
# S = System Uncertainty
@system_uncertainty = Matrix[
[0.0]
]
# I = Identity matrix
@identity = Matrix.identity(3).freeze
@update_time = 0.0
@predict_time = 0.0
end
# F = Next state function (process model)
# This is where everything is goes nuts:
# You need to come up with a matrix, that will represent a model
# of changes between states. So if you apply this matrix to previous
# state vector, you will get the next state prediction vector.
def model
Matrix[
[1.0, self.current_timestep, 0.0], # height = height.prev + timestep * velocity.prev
[0.0, 1.0, self.current_timestep], # velocity = velocity.prev + time * acceleration
[0.0, 0.0, 1.0] # acceleration = prev.acceleration
].freeze
end
def measure(value, timestep)
self.current_timestep = timestep
self.current_measurement = Matrix[
[value]
]
update!
predict!
end
def runtime_ms
(self.update_time + self.predict_time) * 1000.0
end
def to_s
{state: self.state, noise: self.process_noise}
end
alias_method :inspect, :to_s
private
def update!
time = Time.now
## Update stage
# y = z - (H * x)
self.error = self.current_measurement - (self.readings * self.state)
# S = H * P * Ht + R
self.system_uncertainty = self.readings * self.process_noise * self.readings.transpose + self.sensor_noise
# K = P * Ht + S^-1
self.kalman_gain = self.process_noise * self.readings.transpose * system_uncertainty.inverse
# x = x + (K * y)
self.state = self.state + (self.kalman_gain * error)
# P = (I - (K * H)) * P
self.process_noise = (self.identity - (kalman_gain * self.readings)) * self.process_noise
self.update_time = Time.now - time
end
def predict!
time = Time.now
## Prediction stage
# x = (F * x) + u
self.state = (self.model * self.state) + self.control
# P = F * P * Ft
self.process_noise = self.model * self.process_noise * self.model.transpose
self.predict_time = Time.now - time
end
def benchmark
time = Time.now.to_f
yield
return Time.now.to_f - time
end
end
last_value = 0.0
time_step = 0.1
increment = 0.05
kalman = Kalman.new
puts RUBY_DESCRIPTION
Benchmark.ips do |x|
# Configure the number of seconds used during
# the warmup phase (default 2) and calculation phase (default 5)
x.config(:time => 10, :warmup => 2)
# To reduce overhead, the number of iterations is passed in
# and the block must run the code the specific number of times.
# Used for when the workload is very small and any overhead
# introduces incorrectable errors.
x.report("kalman") do |iterations|
iterations.times do
kalman.measure last_value, time_step
last_value = last_value + increment
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment