Skip to content

Instantly share code, notes, and snippets.

@superjax
Created November 6, 2017 04:01
Show Gist options
  • Save superjax/16f1019f79071cb7d18d5712545f1718 to your computer and use it in GitHub Desktop.
Save superjax/16f1019f79071cb7d18d5712545f1718 to your computer and use it in GitHub Desktop.
EKF_SLAM implementation from Probabilistic Robotics by Sebastian Thrun et al. (look for the plot helper gist for covariance plotting)
import numpy as np
import scipy.linalg
import scipy.stats
import matplotlib.pyplot as plt
import scipy.io
import scipy.sparse
from plot_helper import plot_cov_ellipse
from tqdm import tqdm
def R(theta):
return np.array([[np.cos(theta), -np.sin(theta)],
[np.sin(theta), np.cos(theta)]])
class Robot():
def __init__(self, x0, L, alphas, Q):
self.x = x0
self.alphas = alphas
self.l = L
self.Q = Q
def propagate(self, u, dt):
v = u[0] + np.random.randn()*(self.alphas[0]*u[0]*u[0] + self.alphas[1]*u[1]*u[1])**0.5
w = u[1] + np.random.randn()*(self.alphas[2]*u[0]*u[0] + self.alphas[3]*u[1]*u[1])**0.5
theta = self.x[2]
self.x[0] += v/w * (np.sin(theta + w*dt) - np.sin(theta))
self.x[1] += v/w * (np.cos(theta) - np.cos(theta + w*dt))
self.x[2] += w*dt
if self.x[2] > np.pi:
self.x[2] -= 2*np.pi
elif self.x[2] < -np.pi:
self.x[2] += 2*np.pi
return self.x.copy()
def get_measurement(self):
dx = self.l - np.tile(self.x, (len(self.l),1))[:,0:2]
r = scipy.linalg.norm(dx, axis=1) + np.random.randn(len(self.l))*self.Q[0][0]**0.5
theta = (np.arctan2(dx[:,1], dx[:,0]) - self.x[2] + np.random.randn(len(self.l))*self.Q[1][1]**0.5).flatten()
theta[theta > np.pi] -= 2 * np.pi
theta[theta <= -np.pi] += 2 * np.pi
r[np.abs(theta) > 90.0 * np.pi / 180.0] = np.nan
theta[np.abs(theta) > 90.0*np.pi/180.0] = np.nan
return r.copy(), theta.copy(), self.Q.copy()
def plot(self):
plt.ion()
plt.figure(99)
plt.clf()
ax = plt.subplot(111)
heading_line_end_point = self.x[0:2] + np.array([0.6, 0]).dot(R(self.x[2]).T)
circle = plt.Circle((self.x[1], self.x[0]), 0.5, color='y', alpha=0.7)
ax.add_artist(circle)
ax.plot([self.x[1], heading_line_end_point[1]],
[self.x[0], heading_line_end_point[0]], linewidth=2)
ax.plot(self.l[:,1], self.l[:,0], 'kx', markersize=10)
plt.axis('equal')
plt.show()
plt.pause(0.001)
class Controller():
def control(self, t):
u = np.array([1 + 0.6*np.cos(2.*np.pi*0.2*t),
-0.2 + 2.0*np.cos(2.*np.pi*0.6*t)])
return u
class EKF():
def __init__(self, x0, alphas, P0_feat):
self.x = x0
self.P = 0.1*np.eye(len(x0))
self.feat_P0 = P0_feat
self.alphas = alphas
self.num_features = 0
def add_feature(self, pos):
self.num_features += 1
self.x = np.concatenate((self.x, pos), axis=0)
self.P = scipy.linalg.block_diag(self.P, self.feat_P0)
def propagate(self, u, dt):
v = u[0]
w = u[1]
Q_u = np.array( [[self.alphas[0]*u[0]*u[0], self.alphas[1]*u[1]*u[1]],
[self.alphas[2]*u[0]*u[0], self.alphas[3]*u[1]*u[1]]])
theta = self.x[2]
ct = np.cos(theta)
st = np.sin(theta)
st_pw = np.sin(theta + w*dt)
ct_pw = np.cos(theta + w*dt)
self.x[0] += -v/w*st + v/w*st_pw
self.x[1] += v/w*ct - v/w*ct_pw
self.x[2] += w*dt
# wrap psi from pi to -pi
if self.x[2] > np.pi:
self.x[2] -= 2.*np.pi
elif self.x[2] <= - np.pi:
self.x[2] += 2.*np.pi
G = np.eye(3+2*self.num_features)
G[0:2,2] = np.array([-v/w*ct + v/w*ct_pw,
-v/w*st + v/w*st_pw])
F = np.zeros((3+2*self.num_features, 2))
F[0:3,0:2] = np.array([[1./w * (-st + st_pw), v/(w*w) * (st - st_pw) + v/w * ct_pw*dt],
[1./w * (ct-ct_pw), -v/(w*w)*(ct - ct_pw) + v/w * (st_pw*dt)],
[0., dt]])
Qx = [0, 0, 0]
Qx.extend([0.001 for i in range(2 * self.num_features)])
self.P = G.dot(self.P).dot(G.T) + F.dot(Q_u).dot(F.T) + np.diag(Qx)
return self.x.copy(), self.P.copy()
def update(self, r, theta, R):
est_landmarks = np.reshape(self.x[3:], (2,self.num_features), order='F')
dx = est_landmarks - self.x[0:2,None]
rhat = scipy.linalg.norm(dx, axis=0)
q = rhat*rhat
that = np.arctan2(dx[1], dx[0]) - self.x[2]
zhat = np.vstack((rhat, that))
z = np.vstack((r, theta))
residual = z - zhat
residual[1, residual[1, :] > np.pi] -= 2. * np.pi
residual[1, residual[1, :] <= -np.pi] += 2. * np.pi
residual[residual > 0.05] = 0.05
residual[residual < -0.05] = -0.05
for l in range(self.num_features):
if not (np.isfinite(r[l]) and np.isfinite(theta[l])):
continue
H = np.zeros((2, 3+2*self.num_features))
H[0:2,0:2] = 1/q[l]*np.array([[-rhat[l]*dx[0,l], -rhat[l]*dx[1,l]],
[dx[1,l], -dx[0,l]]])
H[0:2,3+2*l:5+2*l] = 1/q[l]*np.array([[rhat[l]*dx[0, l], rhat[l]*dx[1,l]],
[-dx[1, l], dx[0, l]]])
K = self.P.dot(H.T).dot(scipy.linalg.inv(H.dot(self.P).dot(H.T) + R))
self.x += K.dot(residual[:,l])
self.P = (np.eye(3+2*self.num_features) - K.dot(H)).dot(self.P)
return zhat
def run(x0, L, alphas, Q, P0_feat):
robot = Robot(x0.copy(), L.copy(), alphas.copy(), Q)
controller = Controller()
filter = EKF(x0.copy(), alphas.copy(), P0_feat)
dt = 0.1
tm = np.arange(0, 40, dt)
truth = np.nan*np.ones((len(tm), 3))
estimate = np.nan*np.ones((len(tm),3 + 2*len(L)))
cov = np.nan*np.ones((len(tm),3+2*len(L), 3+2*len(L)))
range_measurements = np.nan*np.ones((len(tm), len(L)))
bearing_measurements = np.nan*np.ones((len(tm), len(L)))
zhats = np.nan*np.ones((len(tm), 2, len(L)))
for i in range(len(L)):
filter.add_feature(L[i] + np.random.multivariate_normal(np.zeros(2), P0_feat))
for j, t in tqdm(enumerate(tm)):
u = controller.control(t)
truth[j] = robot.propagate(u, dt)
r, bearing, Q = robot.get_measurement()
range_measurements[j] = r
bearing_measurements[j] = bearing
# robot.plot()
xhat, P = filter.propagate(u, dt)
cov[j] = P
estimate[j] = xhat
zhat = filter.update(r, bearing, Q)
zhats[j] = zhat
if j % 10 == 0:
plt.ion()
plt.figure(1)
plt.clf()
plt.plot(robot.l[:, 1], robot.l[:, 0], 'kx', markersize=10)
for l in range(len(L)):
Xi = 3+2*l
plot_cov_ellipse(np.fliplr(np.flipud(cov[j,Xi:Xi+2,Xi:Xi+2])), np.flipud(estimate[j,Xi:Xi+2]), nstd=2, alpha=0.5)
plt.plot(truth[:, 1], truth[:, 0], 'b-', label='truth')
plt.plot(estimate[:, 1], estimate[:, 0], 'r-', label='estimate')
plot_cov_ellipse(np.fliplr(np.flipud(cov[j, 0:2, 0:2])), np.flipud(estimate[j, 0:2]), nstd=2, alpha=0.5)
plt.axis('equal')
plt.legend()
plt.show()
plt.savefig("images/"+str(j).zfill(5) + ".png")
plt.pause(0.005)
plt.ioff()
plt.figure(2)
plt.clf()
for i in range(3):
plt.subplot(3, 1, i+1)
plt.plot(tm, truth[:, i], label="truth")
plt.plot(tm, estimate[:, i], label="estimate")
plt.plot(tm, estimate[:,i] + 2.*np.sqrt(cov[:,i,i]), 'k', alpha=0.5)
plt.plot(tm, estimate[:, i] - 2. * np.sqrt(cov[:, i, i]), 'k', alpha=0.5)
if j == 0 and i == 0:
plt.legend()
plt.figure(5)
plt.clf()
for i in range(3):
plt.subplot(3, 1, i+1)
plt.plot(tm, np.abs(truth[:, i] - estimate[:, i]), label="error")
plt.plot(tm, 2.*np.sqrt(cov[:,i,i]), 'k', alpha=0.5)
if i == 0:
plt.legend()
plt.figure(3)
plt.clf()
for i in range(3):
plt.subplot(3, 1, i+1)
plt.plot(tm, np.abs(range_measurements[:, i] - zhats[:, 0, i]), label="range_error")
plt.plot(tm, np.abs(bearing_measurements[:, i] - zhats[:, 1, i]), label="bearing_error")
plt.plot(tm, 2. * np.sqrt(cov[:, 3+i, 3+i]), 'k', alpha=0.5)
if i == 0:
plt.legend()
plt.figure(4)
plt.clf()
for i in range(len(L)):
plt.subplot(len(L), 1, i + 1)
plt.plot([tm[0], tm[-1]], [L[i,0], L[i,0]], label="truth_x")
plt.plot(tm, estimate[:,3 + i], label="est_x")
plt.plot([tm[0], tm[-1]], [L[i,1], L[i,1]], label="truth_y")
plt.plot(tm, estimate[:, 4 + i], label="est_y")
# plt.ion()
plt.show()
# plt.pause(0.005)
# plt.ioff()
if __name__ == '__main__':
x0 = np.array([0, 0, 90.*np.pi/180.])
L = np.array([[6.,4.],
[-7.,8.],
[6., -4.],
[-8., -10.],
[5., -3.],
[-1., 6.],
[0., -2.]])
L = np.random.uniform(-20, 20, (25, 2))
alphas = np.array([0.001, 0.001, 0.001, 0.001])
Q = np.diag([0.0005, 0.0005])
P0_feat = np.diag([0.1, 0.1])
run(x0, L, alphas, Q, P0_feat)
@el3ment
Copy link

el3ment commented Nov 6, 2017

https://gist.github.com/superjax/16f1019f79071cb7d18d5712545f1718#file-ekf_slam-py-L92 is incorrect it should be more like (see page 204)

qu = np.array([[alphas[0] * v**2 + alphas[1] * omega ** 2, 0],
                  [0,                                         alphas[2] * v**2 + alphas[3] * omega ** 2]])

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment