Skip to content

Instantly share code, notes, and snippets.

@yjzhang
Created March 10, 2018 23:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yjzhang/8788796b3a082940ebc9a8052b1148d2 to your computer and use it in GitHub Desktop.
Save yjzhang/8788796b3a082940ebc9a8052b1148d2 to your computer and use it in GitHub Desktop.
particle filter implementation for localization in a rectangular room with 1 gaussian observation.
import numpy as np
from scipy.stats import norm
# particle filter for localization using echolocation
# given: measurements, accelerometer readings, heading
# user's state: [x, y, theta]
# control: d(theta), d(position)
# measurement: distance reading
def generate_initial_states(room_dimensions, num_states):
"""
Args:
room_dimensions (array): 3-tuple of x, y, z dimensions
num_states (int)
Returns:
2d array, where each row is a state.
"""
angles = np.random.uniform(0, 2*np.pi, (num_states, ))
x = np.random.uniform(0, room_dimensions[0], (num_states, ))
y = np.random.uniform(0, room_dimensions[1], (num_states, ))
return np.array([x, y, angles]).T
def update_state(state, control):
"""
"""
dt = control[0]
dp = control[1]
dx = np.cos(state[2]+dt)*dp
dy = np.sin(state[2]+dt)*dp
return np.array([state[0]+dx, state[1]+dy, state[2] + dt])
def update_all_states(states, control):
"""
"""
dt = control[0]
dp = control[1]
dx = np.cos(states[:,2] + dt)*dp
dy = np.sin(states[:,2] + dt)*dp
return np.array([states[:, 0]+dx, states[:, 1]+dy, states[:, 2] + dt]).T
def measurement_prob(states, measurement, room_shape, error_variance=1.0):
"""
Args:
states (array): n x 3 array, n is number of states
measurement (float): a single measurement (distance)
room_shape (tuple): dimensions of room (x, y, z)
Returns:
1d array of measurement likelihoods given state
"""
normal_dist = norm(0, np.sqrt(error_variance))
state_weights = np.zeros(states.shape[0])
for i in range(states.shape[0]):
s = states[i,:]
x = s[0]
y = s[1]
t = s[2]
# do ray tracing
ax = x
ay = y
while ax >= 0 and ay >= 0 and ay < room_shape[1] and ax < room_shape[0]:
ax += np.cos(t)
ay += np.sin(t)
actual_d = np.sqrt((ax - x)**2 + (ay - y)**2)
diff = actual_d - measurement
state_weights[i] = normal_dist.pdf(diff)
return state_weights
def measurement_prob_2(states, measurements, room_shape, error_variance=2):
"""
Args:
states (array): n x 3 array, n is number of states
measurement (float): a single measurement (distance)
room_shape (tuple): dimensions of room (x, y, z)
Returns:
1d array of measurement likelihoods given state
"""
# TODO: deal with multiple measurements per state
# future work
normal_dist = norm(0, np.sqrt(error_variance))
state_weights = np.zeros(states.shape[0])
for i in range(states.shape[0]):
s = states[i,:]
x = s[0]
y = s[1]
t = s[2]
# do ray tracing
ax = x
ay = y
while ax >= 0 and ay >= 0 and ay < room_shape[1] and ax < room_shape[0]:
ax += np.cos(t)
ay += np.sin(t)
actual_d = np.sqrt((ax - x)**2 + (ay - y)**2)
diff = actual_d - measurements
state_weights[i] = normal_dist.pdf(diff)
return state_weights
def resample(states, weights, x_noise_var=0.1, t_noise_var=0.2):
"""
Args:
states (array): n x 3 array
weights (array): likelihoods for each state - shape n
Returns: new array of states
"""
n = states.shape[0]
weights = weights/weights.sum()
state_indices = [i for i in range(n)]
new_indices = np.random.choice(state_indices, size=n, replace=True, p=weights)
new_states = states[new_indices,:]
# add noise to states
new_states[:,0] += np.random.normal(0, x_noise_var, n)
new_states[:,1] += np.random.normal(0, x_noise_var, n)
new_states[:,2] += np.random.normal(0, t_noise_var, n)
return new_states
def particle_filter(controls, measurements, room_shape, n_states,
true_states=None, **params):
"""
controls applied before measurement is taken
"""
initial_states = generate_initial_states(room_shape, n_states)
n_steps = controls.shape[0]
states = initial_states
for i in range(n_steps):
print('step {0}'.format(i))
print(states.shape)
updated_states = update_all_states(states, controls[i])
print(updated_states.shape)
weights = measurement_prob(updated_states, measurements[i], room_shape)
new_states = resample(updated_states, weights)
print(new_states.mean(0))
states = new_states
if true_states is not None:
plot_results(states, room_shape, true_states[i,:])
return states
def plot_results(samples, room_shape, true_state=None):
import matplotlib.pyplot as plt
plt.clf()
plt.xlim(0, room_shape[0])
plt.ylim(0, room_shape[1])
plt.scatter(samples[:,0], samples[:,1])
plt.scatter(true_state[0], true_state[1])
plt.draw()
plt.show()
if __name__ == '__main__':
room_shape = (5, 10, 2)
true_positions = np.array([
[1,1.0,np.pi/2],
[1,2.0,np.pi/2],
[1,3.0,np.pi/2],
[1,4.0,np.pi/2],
[1,5.0,np.pi/2],
[1,5.0,np.pi/2],
[1,5.0,np.pi/2],
[1,5.5,np.pi/2],
[1,6.0,np.pi/2],
[1,6.5,np.pi/2],
[1,7.0,np.pi/2],
[1,7.5,np.pi/2],
[1,8.0,np.pi/2],
[1,8.5,np.pi/2],
[1,9.0,np.pi/2],
[1,9.0,np.pi],
[1,9.0,np.pi],
[2,9.0,np.pi],
[3,9.0,np.pi],
[3,9.0,np.pi],
])
controls = np.array([
[0,0],
[0,1.0],
[0,1.0],
[0,1.0],
[0,1.0],
[0,0.0],
[0,0.0],
[0,0.5],
[0,0.5],
[0,0.5],
[0,0.5],
[0,0.5],
[0,0.5],
[0,0.5],
[0,0.5],
[np.pi/2,0.0],
[0,0.0],
[-1.0,0.0],
[-1.0,0.0],
[0,0.0],
])
measurements = np.array([9.5, 8.2, 6.9, 6.1, 5.3, 5.0, 5.0,
4.5, 4.0, 3.5, 3.0, 2.5, 2.0, 1.5, 1.0,
1.0, 1.0, 2.1, 3.1,
3.0])
results = particle_filter(controls, measurements, room_shape, 5000,
true_states=true_positions)
print(results)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment