Skip to content

Instantly share code, notes, and snippets.

@GordStephen
Created January 15, 2016 16:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save GordStephen/b447cad62f3256db8c75 to your computer and use it in GitHub Desktop.
Save GordStephen/b447cad62f3256db8c75 to your computer and use it in GitHub Desktop.
Implements a dual-estimation square-root unscented Kalman filter with univariate noise-free observations, a parameter-free linear measurement transform, nonlinear process evolution and general (non-additive) process noise.
import Base.filter
"""
Implements a dual-estimation square-root unscented Kalman filter
with univariate noise-free observations, a parameter-free linear
measurement transform, nonlinear process evolution and general
(non-additive) process noise.
"""
immutable DualEstimationModel
F::Function # State evolution function (x_k-1 -> x_k)
H::Vector{Float64} # Measurement mapping (x_k -> y_k)
nx::Int # State dimensionality
nu::Int # Exogenous input dimensionality
nv::Int # Process noise dimensionality
np::Int # Parameter vector dimensionality
function DualEstimationModel(f::Function, H::Vector{Float64}, nu::Int, nv::Int, np::Int)
nx = length(H)
@assert size(f(zeros(nx), ones(nu), zeros(nv), zeros(np))) == (nx,)
F{T}(x_prev::Vector{T}, u_prev::Vector{T}, v::Vector{T}, p::Vector{T}) = f(x_prev, u_prev, v, p)
function F{T}(X_prev::Matrix{T}, u_prev::Vector{T}, V::Matrix{T}, p::Vector{T})
X = similar(X_prev)
@simd for i in 1:size(X,2)
X[:,i] = f(X_prev[:,i], u_prev, V[:,i], p)
end #for
return X
end #F
function F{T}(x_prev::Vector{T}, u_prev::Vector{T}, v::Vector{T}, P::Matrix{T})
X = Array{T}(length(x_prev), size(P,2))
@simd for i in 1:size(X,2)
X[:,i] = f(x_prev, u_prev, v, P[:,i])
end #for
return X
end #F
new(F, H, nx, nu, nv, np)
end #DualEstimationModel
end #DualEstimationModel
immutable DualEstimationFittable
model::DualEstimationModel
p₀::Vector{Float64}
Sₚ₀::Matrix{Float64} # Parameter evolution innovation covariance (cholesky factor)
Sᵣ₀::Matrix{Float64}
x₀::Vector{Float64}
Sₓ₀::Matrix{Float64}
Sᵥ::Matrix{Float64} # State evolution innovation covariance (cholesky factor)
σϵ::Float64
function DualEstimationFittable(model::DualEstimationModel,
p₀::Vector{Float64}, Sₚ₀::Matrix{Float64}, Sᵣ₀::Matrix{Float64},
x₀::Vector{Float64}, Sₓ₀::Matrix{Float64}, Sᵥ::Matrix{Float64}, σϵ::Float64)
@assert length(p₀) == model.np
@assert size(Sₚ₀) == (model.np, model.np)
@assert istril(Sₚ₀)
@assert size(Sᵣ₀) == (model.np, model.np)
@assert istril(Sᵣ₀)
@assert length(x₀) == model.nx
@assert size(Sₓ₀) == (model.nx, model.nx)
@assert istril(Sₓ₀)
@assert size(Sᵥ) == (model.nv, model.nv)
@assert istril(Sᵥ)
@assert σϵ > 0
new(model, p₀, Sₚ₀, Sᵣ₀, x₀, Sₓ₀, Sᵥ, σϵ)
end #DualEstimationFittable
end #DualEstimationFittable
call(model::DualEstimationModel, p₀::Vector{Float64}, Sₚ₀::Matrix{Float64}, Sᵣ₀::Matrix{Float64},
x₀::Vector{Float64}, Sₓ₀::Matrix{Float64}, Sᵥ::Matrix{Float64}, σϵ::Float64) =
DualEstimationFittable(model, p₀, Sₚ₀, Sᵣ₀, x₀, Sₓ₀, Sᵥ, σϵ)
function simulate(fittable::DualEstimationFittable, u₀::Vector{Float64}, U::Matrix{Float64}, p::Vector{Float64}, n::Int)
F, H, = fittable.model.F, fittable.model.H
x = Array{Float64}(fittable.model.nx, n)
xₜ, uₜ₋₁ = fittable.x₀, u₀
y = Array{Float64}(n)
y[1] = dot(H, fittable.x₀)
for t in 1:n
vₜ₋₁ = fittable.Sᵥ * randn(fittable.model.nv)
xₜ = F(xₜ, uₜ₋₁, vₜ₋₁, p)
x[:, t] = xₜ
y[t] = dot(H, xₜ) + fittable.σϵ*randn()
uₜ₋₁ = U[:,t]
end #for
return x, y
end #simulate
function sigmapoints{T<:Number}(a::Vector{T}, S::Matrix{T}, α::Float64=0.1, β::Float64=2., κ::Float64=3.0)
#=
if method == :SGQ
n = length(a)
nC2 = n*(n-1)÷2
function submatrix(a,b)
X, i = zeros(n, nC2), 1
for j in 1:n, k in j+1:n
X[j,i] = a
X[k,i] = b
i += 1
end #for
return X
end #submatrix
W = zeros(2n^2+1)
W[1] = n^2/18-7n/18+1
W[2:2n+1] = -n/18+2/9
W[2n+2:2n^2+1] = 1/36
Wᵐ = W'
sqrtWᶜ = sqrt(complex(W))
ps3, ms3 = sqrt(3), -sqrt(3)
Γ = [zeros(n) ps3*eye(n) ms3*eye(n) submatrix(ps3,ps3) submatrix(ms3,ms3) submatrix(ps3,ms3) submatrix(ms3,ps3)]
Σ = a.+S*Γ
elseif method == :MU #Durbin-Koopman unscented modification
L = length(a)
ζ = [1,2.]
J = length(ζ)
ϕζ = 1/sqrt(2π) * exp(-ζ.^2/2)
w₀ = 1 - (sum(ϕζ)*dot(ϕζ,ζ.^4))/(3*dot(ϕζ,ζ.^2)^2)
λ = sqrt(sum(ϕζ)/((1-w₀)*dot(ϕζ,ζ.^2)))
w = (1-w₀) / 2*L*sum(ϕζ)
Γ = zeros(L)
W = [w₀] #; w*ones(2*L*J)]
for j in 1:J
Γ = hcat(Γ, λ*ζ[j]*eye(L), -λ*ζ[j]*eye(L))
append!(W, w*ϕζ[j]*ones(2L))
end #for
Wᵐ = W'
sqrtWᶜ = W |> complex |> sqrt
Σ = a .+ S*Γ
else # classic unscented
=#
L = length(a)
λ = α^2*(L+κ)-L
Γ = sqrt(L+λ)
Wᵇ = .5/(L+λ)
vecWᵇ = Wᵇ * ones(2L)
Wᵐ = [λ/(L+λ) vecWᵇ']
sqrtWᶜ = [λ/(L+λ)+1-α^2+β; vecWᵇ] |> complex |> sqrt
Σ = [a a.+Γ*S a.-Γ*S]
#end #if
return Σ, Wᵐ, sqrtWᶜ
end #function
function gen_loglikelihood(fittable::DualEstimationFittable, y::Vector{Float64},
u₀::Vector{Float64}=zeros(model.nu), u::Matrix{Float64}=zeros(model.nu, length(y)),
α::Float64=0.1, β::Float64=2., κ::Float64=3.)
n = length(y)
nx, nu, nv = fittable.model.nx, fittable.model.nu, fittable.model.nv
@assert size(u) == (nu, n)
F, H = fittable.model.F, fittable.model.H'
xᶠₜ, Sₓᶠ, uₜ = fittable.x₀, fittable.Sₓ₀, u₀
Sᵥ, σₑ = fittable.Sᵥ, fittable.σϵ
nany = isnan(y)
L = nx+nv+1
znx = zeros(nx)
xaug = zeros(L)
function xᵃ(x)
xaug[1:nx] = x
return xaug
end #xᵃ
Saug = zeros(L,L)
Saug[nx+1:nx+nv, nx+1:nx+nv] = Sᵥ
Saug[end] = σₑ
function Sᵃ(Sₓ)
Saug[1:nx,1:nx] = Sₓ
return Saug
end #Sᵃ
λ = α^2*(L+κ)-L
Γ = sqrt(L+λ)
Wᵇ = .5/(L+λ)
vecWᵇ = Wᵇ * ones(2L)
Wᵐ = [λ/(L+λ) vecWᵇ']
sqrtWᶜ = [λ/(L+λ)+1-α^2+β; vecWᵇ] |> complex |> sqrt
Σ(a::Vector{Float64}, S::Matrix{Float64}) = [a a.+Γ*S a.-Γ*S]
function loglikelihood(p::Vector{Float64})
ll = -n*log(2π)/2
@inbounds @fastmath for t in 1:n
xₜ₋₁, Sₓₜ₋₁ = xᶠₜ, Sₓᶠ
uₜ₋₁, yₜ, nanyₜ = uₜ, y[t], nany[t]
χ = Σ(xᵃ(xₜ₋₁), Sᵃ(Sₓₜ₋₁))
Xₜ₋₁, Vₜ₋₁, Eₜ = χ[1:nx,:], χ[nx+1:nx+nv,:], χ[nx+nv+1, :]
noise = 1e-9randn(nx,L)
Xₜ = F(Xₜ₋₁, uₜ₋₁, Vₜ₋₁, p) + [znx noise -noise]
xᵖₜ = vec(sum(Wᵐ.*Xₜ, 2))
Yₜ = H*Xₜ + Eₜ
yᵖₜ = vecdot(Wᵐ, Yₜ)
A = real(qr(sqrtWᶜ.*[(Yₜ .- yᵖₜ); (Xₜ .- xᵖₜ)]')[2]')
F⁻¹ϵ = (yₜ - yᵖₜ) / A[1]
xᶠₜ = nanyₜ ? xᵖₜ : xᵖₜ + A[2:end,1] * F⁻¹ϵ
Sₓᶠ = A[2:end, 2:end]
!nanyₜ && (ll -= (F⁻¹ϵ*F⁻¹ϵ + log(A[1]^2)))
uₜ = u[:,t]
end #for
return ll
end #loglikelihood
end #singlefilter
function dualfilter(fittable::DualEstimationFittable, y::Vector{Float64}, u₀::Vector{Float64}=zeros(model.nu),
u::Matrix{Float64}=zeros(model.nu, length(y)), λ::Float64=.99, α::Float64=0.1, β::Float64=2., κ::Float64=3.)
@assert 0 ≤ λ ≤ 1
n = length(y)
nx, nu, nv, np = fittable.model.nx, fittable.model.nu, fittable.model.nv, fittable.model.np
@assert size(u) == (nu, n)
F, H = fittable.model.F, fittable.model.H'
xᶠₜ, Sₓᶠ, uₜ = fittable.x₀, fittable.Sₓ₀, u₀
pᶠₜ, Sₚᶠ = fittable.p₀, fittable.Sₚ₀
Sᵥ, Sᵣ, σₑ = fittable.Sᵥ, fittable.Sᵣ₀, fittable.σϵ
λ₁ = 1 - λ
λ₂ = sqrt(λ/λ₁)
x, xᵖ = Array{Float64}(nx, n), Array{Float64}(nx, n)
p, pᵖ = Array{Float64}(np, n), Array{Float64}(np, n)
nany = isnan(y)
for t in 1:n
#print("\r$t")
xₜ₋₁, Sₓₜ₋₁ = xᶠₜ, Sₓᶠ
pₜ₋₁, Sₚₜ₋₁ = pᶠₜ, Sₚᶠ
uₜ₋₁, yₜ, nanyₜ = uₜ, y[t], nany[t]
# Filter x
χ, Wᵐ, sqrtWᶜ = sigmapoints([xₜ₋₁; zeros(nv+1)],
[Sₓₜ₋₁ zeros(nx,nv+1); zeros(nv,nx) Sᵥ zeros(nv,1); zeros(1,nx+nv) σₑ], α, β, κ)
Xₜ₋₁, Vₜ₋₁, Eₜ = χ[1:nx,:], χ[nx+1:nx+nv,:], χ[nx+nv+1, :]
noise = 1e-9randn(nx,(nx+nv+1))
Xₜ = F(Xₜ₋₁, uₜ₋₁, Vₜ₋₁, pₜ₋₁) + [zeros(nx) noise -noise]
xᵖₜ = vec(sum(Wᵐ.*Xₜ, 2))
Yₜ = H*Xₜ + Eₜ
yᵖₜ = vecdot(Wᵐ, Yₜ)
A = real(qr(sqrtWᶜ.*[(Yₜ .- yᵖₜ); (Xₜ .- xᵖₜ)]')[2]')
xᶠₜ = nanyₜ ? xᵖₜ : xᵖₜ + A[2:end,1] / A[1] * (yₜ - yᵖₜ)
Sₓᶠ = A[2:end, 2:end]
# Filter p
Sᵣ = λ*Sᵣ
ρ, Wᵐ, sqrtWᶜ = sigmapoints([pₜ₋₁; zeros(np+1)],
[Sₚₜ₋₁ zeros(np,np+1); zeros(np,np) Sᵣ zeros(np,1); zeros(1,2np) σₑ], α, β, κ)
Pₜ₋₁, Rₜ₋₁, Eₜ = ρ[1:np,:], ρ[np+1:2np,:], ρ[2np+1, :]
Pₜ = Pₜ₋₁ + Rₜ₋₁
pᵖₜ = vec(sum(Wᵐ.*Pₜ, 2))
Yₜ = H*F(xₜ₋₁, uₜ₋₁, zeros(nv), Pₜ) + Eₜ
yᵖₜ = vecdot(Wᵐ, Yₜ)
A = real(qr(sqrtWᶜ.*[(Yₜ .- yᵖₜ); (Pₜ .- pᵖₜ)]')[2]')
K = A[2:end,1] / A[1]
pᶠₜ = nanyₜ ? pᵖₜ : pᵖₜ + A[2:end,1] / A[1] * (yₜ - yᵖₜ)
Sₚᶠ = A[2:end, 2:end]
#Sᵣ = fittable.λ*Sᵣ
#Sᵣ = λ₁*cholupdate!(Sᵣ, λ₂*K*(yₜ - vecdot(H,xᶠₜ)))
#xᵖ[:,t], pᵖ[:,t] = xᵖₜ, pᵖₜ
x[:,t], p[:,t] = xᶠₜ, pᶠₜ
uₜ = u[:,t]
end #for
return x, p, Sₚᶠ
end #dualfilter
function singlesmooth(fittable::DualEstimationFittable, y::Vector{Float64}, p::Vector{Float64},
u₀::Vector{Float64}=zeros(model.nu), u::Matrix{Float64}=zeros(model.nu, length(y)),
α::Float64=0.1, β::Float64=2., κ::Float64=3.)
n = length(y)
nx, nu, nv = fittable.model.nx, fittable.model.nu, fittable.model.nv
@assert size(u) == (nu, n)
F, H = fittable.model.F, fittable.model.H'
xᶠₜ, Sₓᶠ, uₜ = fittable.x₀, full(fittable.Sₓ₀), u₀
Sᵥ, σₑ = full(fittable.Sᵥ), fittable.σϵ
x, xᵖ = Array{Float64}(nx, n), Array{Float64}(nx, n)
D = Array{Float64}(nx, nx, n)
Sₓˢ = Zˢ = Array{Float64}(nx, nx, n) #Store Sₓˢ in Zˢ to save space
nany = isnan(y)
for t in 1:n
xₜ₋₁, Sₓₜ₋₁ = xᶠₜ, Sₓᶠ
uₜ₋₁, pₜ₋₁ = uₜ, p
yₜ, nanyₜ = y[t], nany[t]
χ, Wᵐ, sqrtWᶜ = sigmapoints([xₜ₋₁; zeros(nv+1)],
[Sₓₜ₋₁ zeros(nx,nv+1); zeros(nv,nx) Sᵥ zeros(nv,1); zeros(1,nx+nv) σₑ], α, β, κ)
Xₜ₋₁, Vₜ₋₁, Eₜ = χ[1:nx,:], χ[nx+1:nx+nv,:], χ[nx+nv+1, :]
noise = 1e-9randn(nx,(nx+nv+1))
Xₜ = F(Xₜ₋₁, uₜ₋₁, Vₜ₋₁, pₜ₋₁) + [zeros(nx) noise -noise]
xᵖₜ = vec(sum(Wᵐ.*Xₜ, 2))
A = real(qr(sqrtWᶜ.*[(Xₜ .- xᵖₜ); (Xₜ₋₁ .- xₜ₋₁)]')[2]')
Zˢ[:,:,t] = A[nx+1:end, nx+1:end]
D[:,:,t] = A[nx+1:end, 1:nx] / A[1:nx, 1:nx]
Yₜ = H*Xₜ + Eₜ
yᵖₜ = vecdot(Wᵐ, Yₜ)
A = real(qr(sqrtWᶜ.*[(Yₜ .- yᵖₜ); (Xₜ .- xᵖₜ)]')[2]')
xᶠₜ = nanyₜ ? xᵖₜ : xᵖₜ + A[2:end,1] / A[1] * (yₜ - yᵖₜ)
Sₓᶠ = A[2:end, 2:end]
xᵖ[:,t], x[:,t] = xᵖₜ, xᶠₜ
uₜ = u[:,t]
#print("\r$t")
end #for
Sₓˢ[:,:,n] = Sₓᶠ
for t in n-1:-1:1
Dₜ = D[:,:,t]
x[:,t] = x[:,t] + Dₜ * (x[:,t+1] - xᵖ[:,t+1])
Sₓˢ[:,:,t] = qr([Zˢ[:,:,t] Dₜ*Sₓˢ[:,:,t+1]]')[2]'
#print("\r$t")
end #for
return xᵖ, x, Sₓˢ
end #singlesmooth
immutable DualEstimationFit
fittable::DualEstimationFittable
y::Vector{Float64}
u::Matrix{Float64}
x::Matrix{Float64}
P_x::Array{Float64,3}
p::Vector{Float64}
P_p::Matrix{Float64}
function DualEstimationFit(fittable::DualEstimationFittable,
y::Vector{Float64}, u::Matrix{Float64},
x::Matrix{Float64}, P_x::Array{Float64, 3},
p::Vector{Float64}, P_p::Matrix{Float64})
n = length(y)
nx = fittable.model.nx
np = fittable.model.np
nu = fittable.model.nu
@assert size(u) == (n, nu)
@assert size(x) == (n, nx)
@assert size(P_x) == (nx, nx, n)
@assert size(p) == (np,)
@assert size(P_p) == (np, np)
new(fittable, y, u, x, P_x, p, P_p)
end #DualEstimationFit
end #DualEstimationFit
function fixedparametersmooth(modelinstance::DualEstimationFittable, y::Vector{Float64}, u₀::Vector{Float64}, u::Matrix{Float64},
λ::Float64=.99, α::Float64=1e-2, β::Float64=2., κ::Float64=-1.)
xfs, pfs, Sₚᶠ = dualfilter(modelinstance, y, u₀, u, λ, α, β, κ)
xps, xss, Sₓss = singlesmooth(modelinstance, y, pfs[:,end], u₀, u, α, β, κ)
return DualEstimationFit(modelinstance, y, u', xss', symprod!(Sₓss), pfs[:,end], Sₚᶠ)
end #smooth
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment