Skip to content

Instantly share code, notes, and snippets.

@rjw57
Created March 15, 2016 16:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rjw57/b6d31dbad8d5239fafdb to your computer and use it in GitHub Desktop.
Save rjw57/b6d31dbad8d5239fafdb to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Track observations outputted by reproject-obs.py
Usage:
track-reproj-observations.py [options] <observations> <output>
Options:
--max-frame=FRAME Only process up to frame FRAME.
<observations> is a npz file generated by scripts/reproject-obs.py
<output> is the .MAT file to write the output to.
The output has a single variable, smoothed, where each row is the following:
- frame index
- track index
- world x
- world y
- world x velocity
- world y velocity
- image space width
- image space height
"""
from __future__ import division, print_function
import itertools
from collections import namedtuple
import docopt
import numpy as np
import scipy.io as sio
Track = namedtuple('Track',
'state_estimates state_covariances observations birth_time')
def as_square_array(X):
"""A utility function to return X massaged into a square array. Raises ValueError if
X cannot be so massaged.
"""
X = np.atleast_2d(X)
if len(X.shape) != 2 or X.shape[0] != X.shape[1]:
raise ValueError('Expected square matrix')
return X
# ## The Kalman filter
#
# We make use of the standard Kalman filter to both interpolate missing
# observations for a given track and predict observations for the next frame.
#
# Let's have a brief reminder of the Kalman filter. At each time instant, $k$,
# we assert that there is an *a priori* estimate of state, $\hat{x}_{k|k-1}$,
# and state estimate covariance, $P_{k|k-1}$. The initial values of these
# parameters are given when the Kalman filter is initialised. Our goal is to
# form an *a posteriori* estimate of state, $\hat{x}_{k|k}$, and state estimate
# covariance, $P_{k|k}$ given an observation, $z_k$.
#
# ### *A Priori* Prediction
#
# At time $k$ we are given a state transition matrix, $F_k$, and estimate of the
# *process noise*, $Q_k$. Our *a priori* estimates are then given by:
#
# $$ \hat{x}_{k|k-1} = F_k \hat{x}_{k-1|k-1}, \quad P_{k|k-1} = F_k P_{k-1|k-1}
# F_k^T + Q_k. $$
#
# ### Innovation
#
# At time $k$ we are given a matrix, $H_k$, which gives the "true" observation
# given the state and some estimate of the observation noise covariance, $R_k$.
# We may now compute the innovation, $y_k$, of the observation from the
# predicted observation and our expected innovation covariance, $S_k$:
#
# $$ y_k = z_k - H_k \hat{x}_{k|k-1}, \quad S_k = H_k P_{k|k-1} H_k^T + R_k. $$
#
# ### Update
#
# We now merge our observation with the state estimate via the so-called *Kalman
# gain*, $K_k$:
#
# $$ K_k = P_{k|k-1} H_k^T S_k^{-1}. $$
#
# Merging is straightforward. Note that if we have no observation, our *a
# posteriori* estimate reduces to the *a priori* one:
#
# $$ \hat{x}_{k|k} = \hat{x}_{k|k-1} + K_k y_k, \quad P_{k|k} = P_{k|k-1} - K_k
# H_k P_{k|k-1}. $$
# We implement the Kalman filter as a class which maintains a list of *a priori*
# and *a posteriori* state estimates and state estimate covariances. Each filter
# has a ``predict`` and ``update`` method corresponding to the steps above.
# Additionally we add a convenience ``innovation_squared_distance`` methof which
# returns the squared Euclidean innovation magnitudes for a set of candidate
# observations normalised by the inverse of the innovation covariance.
class KalmanFilter(object):
def __init__(self, initial_mean, initial_covariance, birth_time=0):
self.prior_state_means = [np.copy(initial_mean)]
self.prior_state_covariances = [as_square_array(np.copy(initial_covariance))]
self.posterior_state_means = [self.prior_state_means[-1]]
self.posterior_state_covariances = [self.prior_state_covariances[-1]]
# Used by RTS smoother first transition matrix corresponding to initial
# mean and covariance is meaningless. Add placeholder of None.
self.process_matrices = [None]
self.process_noises = [None]
# The observations list holds records of observations associated with the filter at
# time k. If no observations are recorded, this will be an empty list. Otherwise it is
# a list of observation, observation_matrix, observation_covariance triples.
self.observations = [[]]
self.birth_time = birth_time
self.last_observation_idx = None # Index at which the last observation was associated
def predict(self, process_matrix, process_noise):
"""Predict the next prior state mean and covariance given the last
posterior. After calling this method, prior_state_{means,covariances}
has an element appended.
"""
process_matrix = as_square_array(process_matrix)
process_noise = as_square_array(process_noise)
if process_matrix.shape[0] != process_noise.shape[0]:
raise ValueError("Process matrix and noise have incompatible shapes: {} vs {}".format(
process_matrix.shape, process_noise.shape))
# Update state mean and covariance
self.prior_state_means.append(process_matrix.dot(self.posterior_state_means[-1]))
self.prior_state_covariances.append(
process_matrix.dot(self.posterior_state_covariances[-1]).dot(process_matrix.T) +
process_noise
)
# Record transition matrix
self.process_matrices.append(process_matrix)
self.process_noises.append(process_noise)
# Append empty list to observations for this time step
self.observations.append([])
# Seed posterior estimates with *copies* of the prior ones
self.posterior_state_means.append(np.copy(self.prior_state_means[-1]))
self.posterior_state_covariances.append(np.copy(self.prior_state_covariances[-1]))
def update(self, observation, observation_matrix, observation_noise):
"""After each predict(), update() may be called repeatedly to provide additional observations
for each time step.
"""
observation_matrix = np.atleast_2d(observation_matrix)
observation_noise = as_square_array(observation_noise)
observation = np.atleast_1d(observation)
expected_obs_mat_shape = (observation_noise.shape[0], self.state_dimension)
if observation_matrix.shape != expected_obs_mat_shape:
raise ValueError('Observation matrix is wrong shape ({}). Expected: {}'.format(
observation_matrix.shape, expected_obs_mat_shape))
if observation.shape != (observation_noise.shape[0],):
raise ValueError('Observation is wrong shape ({}). Expected: {}'.format(
observation.shape, (observation_noise.shape[0],)
))
# Add observation triple to list
self.observations[-1].append((observation, observation_matrix, observation_noise))
self.last_observation_idx = len(self.observations) - 1
# "Prior" in this case means "before we've updated with this observation".
prior_mean = np.copy(self.posterior_state_means[-1])
prior_covariance = np.copy(self.posterior_state_covariances[-1])
# Can compute innovation covariance & Kalman gain without an observation
innovation = observation - observation_matrix.dot(prior_mean)
innovation_cov = observation_matrix.dot(prior_covariance).dot(observation_matrix.T)
innovation_cov += observation_noise
kalman_gain = prior_covariance.dot(observation_matrix.T).dot(np.linalg.inv(innovation_cov))
# Update estimates
self.posterior_state_means[-1] += kalman_gain.dot(innovation)
self.posterior_state_covariances[-1] -= kalman_gain.dot(observation_matrix).dot(
prior_covariance)
@property
def last_idx(self):
return len(self.posterior_state_means)
@property
def state_dimension(self):
return self.prior_state_means[-1].shape[0]
@property
def observation_count(self):
return sum(len(o) for o in self.observations)
def innovation_squared_distance(self, observation_matrix, observation_noises, observations):
"""
Given a NxOBS_DIM array of observations, return normalised innovation
squared distances from the current prior mean. The observation_noises
matrix should be an NxOBS_DIMxOBS_DIM array giving observation
covariances for each observation
"""
observation_matrix = np.atleast_2d(observation_matrix)
observation_noises = np.atleast_3d(observation_noises)
expected_obs_mat_shape = (observation_noises.shape[1], self.state_dimension)
if observation_matrix.shape != expected_obs_mat_shape:
raise ValueError('Observation matrix is wrong shape ({}). Expected: {}'.format(
observation_matrix.shape, expected_obs_mat_shape))
observations = np.atleast_2d(observations)
if observations.shape[1] != observation_noises.shape[1]:
raise ValueError('Observation of unexpected length {}. Expected: {}'.format(
observations.shape[1], observation_noises.shape[1]))
# Compute current posterior observation mean and covariance.
# Note: if update() has not yet been called, these are just the a priori estimates
posterior_obs_mean = observation_matrix.dot(self.posterior_state_means[-1])
posterior_obs_cov = observation_matrix.dot(self.posterior_state_covariances[-1]).dot(
observation_matrix.T)
innovation_dists = []
for observation_noise, observation in zip(observation_noises, observations):
observation_noise = as_square_array(observation_noise)
# Add in any observation noise
innovation_cov = posterior_obs_cov + observation_noise
innovation_cov_inv = np.linalg.inv(innovation_cov)
# Compute innovations
innovation = observation - posterior_obs_mean
innovation_dists.append(innovation.dot(innovation_cov_inv).dot(innovation))
return np.array(innovation_dists)
# ### Merging of filters
#
# During tracking, some filters may end up converging on the same underlying
# car. This is usually due to multiple overlapping observations being generated
# by one true car. We counter this by merging tracks which "collide". Merging is
# done by creating a new filter from an existing filter by "replaying" the
# observations and predict steps.
def merge_kalman_filters(a, b, initial_mean, initial_covariance):
"""Merge KalmanFilter instances a and b into a single KalmanFilter. Return
the merged filter.
"""
if a.state_dimension != b.state_dimension:
raise ValueError('Filters have differing state dimension, {} vs {}'.format(
a.state_dimension, b.state_dimension
))
initial_mean = np.atleast_1d(initial_mean)
if a.state_dimension != initial_mean.shape[0]:
raise ValueError('Initial mean has incorrect dimension {}. Expected: {}'.format(
initial_mean.shape[0], a.state_dimension
))
# Start at minimum birth time
birth_time = min(a.birth_time, b.birth_time)
# Create new filter
new_f = KalmanFilter(initial_mean, initial_covariance, birth_time)
# Start moving through time
for t in itertools.count(birth_time):
# Work out which filters this time corresponds to
a_k, b_k = t - a.birth_time, t - b.birth_time
within_a = a_k >= 0 and a_k < a.last_idx
within_b = b_k >= 0 and b_k < b.last_idx
# If we've left both filters, return
if not within_a and not within_b:
return new_f
# If this is not the first time step, predict a priori states for
# new filter
if t > birth_time:
F = np.zeros((new_f.state_dimension, new_f.state_dimension))
Q = np.zeros((new_f.state_dimension, new_f.state_dimension))
nF = 0
if within_a and a_k > 0:
F += a.process_matrices[a_k]
Q += a.process_noises[a_k]
nF += 1
if within_b and b_k > 0:
F += b.process_matrices[b_k]
Q += b.process_noises[b_k]
nF += 1
assert nF >= 1
F /= nF
Q /= nF
new_f.predict(F, Q)
# Now simply update with a and b observations
all_obs = []
if within_a:
all_obs.extend(a.observations[a_k])
if within_b:
all_obs.extend(b.observations[b_k])
for obs, obs_mat, obs_noise in all_obs:
new_f.update(obs, obs_mat, obs_noise)
# Should never be reached
assert False
# ## Tracking
#
# Setting the state transition and observation matrices is straightforward.
# There's a degree of finger-in-the-air estimation for the process and
# observation noises.
STATE_DIM = OBS_DIM = 6
# State and observation are x, y, vx, vy, w, h
F = np.array([
[1, 0, 1, 0, 0, 0],
[0, 1, 0, 1, 0, 0],
[0, 0, 1, 0, 0, 0],
[0, 0, 0, 1, 0, 0],
[0, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 1],
])
# All state vector elements are directly observed
H = np.eye(OBS_DIM)
# We calculate process and observation noise based on the current state and observation
def process_noise_for_state(state):
# The process noise is assumed to be small (except for w and h which could
# do their own thing really)
v_mag = np.hypot(state[2], state[3])
v_mag = min(0.2, v_mag)
w = state[4]
Q = np.diag([1e-1, 1e-1, 3e-4 + 1e-1*v_mag, 3e-4 + 1e-1*v_mag,
1e0 + 1e-2*w, 1e0 + 1e-2*w])**2
return Q
def measurement_noise_for_observation(obs, camera_at):
# The measurement noise is somewhat loosely estimated.
v_mag = np.hypot(obs[2], obs[3])
v_mag = min(0.2, v_mag)
dist = np.sqrt(np.sum(np.square(camera_at - np.array([obs[0], obs[1], 0]))))
w = obs[4]
R = np.diag([1e-1 + 1e-1*dist, 1e-1 + 1e-1*dist, 1e-2 + 1e-1*v_mag, 1e-2 + 1e-1*v_mag,
1e1 + 1e-1*w, 1e1 + 1e-1*w])**2
return R
# Overlap radius (in car-plane) which tracks should be merged
MERGE_RADIUS = 0.8
# Maximum number of frames a filter may live with no observations
MAX_GAP = 4
# ### Scott and Longuet-Higgins feature association algorithm
#
# Each filter will predict which observation should be generated in the next
# frame. We need an algorithm to associate actual observations with predicted
# ones. The Scott and Longuet-Higgins (SLH) algorithm is a fairly neat method
# which requires only that we can construct some distance metric for candidate
# association pairs. Fortunately this is exactly what the innovation covariance
# in the Kalman filter gives us.
#
# When tracking we will form a Gaussian-weighted proximity matrix, $G$, where
# each element is given by $G_{ij} = \exp(-d_{ij}^2)$. The value $d_{ij}^2$ is
# the squared innovation distance returned by ``innovation_squared_distance``.
# These distances are already normalised with the inverse of the innovation
# covariance.
#
# The SLH algorithm proceeds by taking the SVD, $U \, S \, V^H = G$. The
# diagonal matrix $S$ contains the singular values along the diagonal. A similar
# matrix $E$ is created which is like $S$ except that the non-zero singular
# values are set to unity. The matrix $P = U \, E \, V^H$ is then computed.
#
# This has the effect of "amplifying" the proximities of closely associated
# pairs in a global way.
def slh_association_matrix(G):
"""Perform the association step of the Scott and Longuet-Higgins algorithm.
G is a Gaussian weighted proximity matrix where G_ij is the Gaussian
weighted proximity of feature i to feature j. Returns an "amplified"
proximity matrix, P, of the same shape as G where feature i is associated
with feature j iff P_ij is the maximal value in its row and column.
Source: http://www.bmva.org/bmvc/1997/papers/081/node3.htm
"""
# Take SVD of G
U, S, Vh = np.linalg.svd(G)
# How many non-zero singular values?
nsv = np.count_nonzero(S)
# Trim U and Vh to match this number
U, Vh = U[:, :nsv], Vh[:nsv, :]
# Compute new matrix P as if singular values were unity
P = U.dot(Vh)
return P
# Forming associations given the $P$ matrix is simple; we associate feature $i$
# and $j$ is $P_{ij}$ is the maximal value in both its row and column. We can
# write a simple function to extract associations.
def slh_associations(P):
"""Given an association matrix P return a Nx2 matrix of associations where
each row is an i,j pair asserting an asssociation of features i and j.
"""
associations = []
# Compute column-wise maxima
col_max_idxs = np.argmax(P, axis=0)
# For each row of P...
for row_idx, row in enumerate(P):
# ... compute index of maximum element
col_idx = np.argmax(row)
# Is this row also the maximum in that column?
if col_max_idxs[col_idx] == row_idx:
associations.append((row_idx, col_idx))
if len(associations) == 0:
return np.zeros((0, 2))
return np.vstack(associations)
# When tracking we may get into a situation where two filters are tracking the
# same underlying car with each filter getting one N-th of the observations. To
# avoid this we add the ability to merge tracks to the tracking. Two tracks, A
# and B, should be merged if *all* of the following hold:
#
# 1. The separation of the cars in the traffic-plane is within some merge radius.
# 2. The separation of A's mean w.r.t. B is large.
# 3. The separation of B's mean w.r.t. A is large.
# How are traffic-plane observations taken
MERGE_OBSERVATION_MATRIX = np.array([
[1, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0],
])
def should_merge_filters(fa, fb, merge_radius=1, sigma_radius=3,
merge_obs_matrix=MERGE_OBSERVATION_MATRIX):
merge_obs_matrix = np.atleast_2d(merge_obs_matrix)
a_obs = merge_obs_matrix.dot(fa.posterior_state_means[-1])
b_obs = merge_obs_matrix.dot(fb.posterior_state_means[-1])
# Observations must be within merge radius
if np.sum(np.square(a_obs - b_obs)) > merge_radius * merge_radius:
return False
a_cov = fa.posterior_state_covariances[-1]
b_cov = fb.posterior_state_covariances[-1]
# State difference w.r.t. covariances
delta = fa.posterior_state_means[-1] - fb.posterior_state_means[-1]
delta = delta.dot(np.linalg.inv(a_cov + b_cov)).dot(delta)
# Deltas must be outside of sigma_radius
if delta <= sigma_radius*sigma_radius:
return False
return True
def track(observations_by_frame, min_frame, max_frame,
camera_at, max_gap, merge_radius):
# The tracking itself maintains a set of "live" tracks which have had an
# observation associated with them in the past ``max_gap`` frames. Tracks
# which don't get an observation within that time are considered to have
# "died".
#
# For each live filter, the normalised squared innovation distances to each
# observation is computed. From this matrix, the SLH $G$ and $P$ matrices
# are computed. The $P$ matrix is used to form filter, observation
# associations.
#
# For each association which is within a threshold distance, the filter is
# associated with the obsersvation and the ``update`` method is called.
# Filters which do not get an associated observation are updated with
# missing data. Observations with no filter associations are used to
# initialise new live filters.
#
# Finally, any live filter which has not received an observation for
# ``max_gap`` frames is killed.
# Maximum Gaussian-weighted distance for an observation pair which is
# considered associated
gaussian_dist_threshold = np.exp(-4*4)
# Various parameters updated during tracking
live_filters, dead_filters = set(), set()
state_dim = F.shape[0]
init_mean, init_cov = np.zeros(state_dim), np.diag(1e10 * np.ones(state_dim))
meas_noise_cb = lambda obs: measurement_noise_for_observation(obs, camera_at)
for frame_idx in range(min_frame, max_frame):
# Predict observations for all live filters
for f in live_filters:
f.predict(F, process_noise_for_state(f.posterior_state_means[-1]))
# Get observations for this frame
frame_observations = observations_by_frame.get(frame_idx, np.zeros((0,)))
# Covert live filters to an ordered list
live_filter_list = list(live_filters)
# If we have some current filters and some current observations...
if len(live_filter_list) > 0 and frame_observations.shape[0] > 0:
# ...we implement the Scott and Longuet-Higgins algorithm for tracking.
# Compute matrix of Gaussian weighted proximities, G
G = np.exp(-np.vstack([
f.innovation_squared_distance(
H,
np.vstack([meas_noise_cb(o)[np.newaxis, ...]
for o in frame_observations]),
frame_observations
)
for f in live_filter_list
]))
# Compute SLH association matrix
P = slh_association_matrix(G)
# Compute associations
associations = slh_associations(P)
# Filter associations by whether G is sufficiently large
associations = [
(f_idx, o_idx) for f_idx, o_idx in associations
if G[f_idx, o_idx] > gaussian_dist_threshold
]
else:
# We have no associations
associations = []
# Now process associations
filters_with_no_observation = set(range(len(live_filter_list)))
observations_with_no_filter = set(range(frame_observations.shape[0]))
for f_idx, o_idx in associations:
f = live_filter_list[f_idx]
o = frame_observations[o_idx]
f.update(o, H, meas_noise_cb(o))
# This filter now has an association
filters_with_no_observation.remove(f_idx)
observations_with_no_filter.remove(o_idx)
# Create new filters for orphan observations
for o_idx in observations_with_no_filter:
new_f = KalmanFilter(init_mean, init_cov, frame_idx)
new_f.update(frame_observations[o_idx], H,
meas_noise_cb(frame_observations[o_idx]))
live_filters.add(new_f)
# Update ordered list of live filters
live_filter_list = list(live_filters)
# Now consider pairwise filters. We merge filters which overlap.
live_filters = set()
removed_filters = set()
for fa_idx, fa in enumerate(live_filter_list):
if fa in removed_filters:
continue
to_merge = set()
for fb in live_filter_list[fa_idx+1:]:
if fb in removed_filters:
continue
if should_merge_filters(fa, fb, merge_radius=merge_radius):
to_merge.add(fb)
removed_filters.add(fb)
for fb in to_merge:
fa = merge_kalman_filters(fa, fb, init_mean, init_cov)
live_filters.add(fa)
# Kill any filters which have no observations for the past max_gap frames
to_kill = set(
f for f in live_filters
if f.last_idx - f.last_observation_idx > max_gap
)
live_filters.difference_update(to_kill)
dead_filters.update(to_kill)
return live_filters, dead_filters
# ## Rauch–Tung–Striebel smoothing
#
# The Rauch–Tung–Striebel (RTS) smoother provides a method of computing the "all
# data" *a posteriori* estimate of states (as opposed to the "all previous data"
# estimate). Assuming there are $n$ time points in the filter, then the RTS
# computes the *a posteriori* state estimate at time $k$ after all the data for
# $n$ time steps are known, $\hat{x}_{k|n}$, and corresponding covariance,
# $P_{k|n}$, recursively:
#
# $$ \hat{x}_{k|n} = \hat{x}_{k|k} + C_k ( \hat{x}_{k+1|n} - \hat{x}_{k+1|k} ),
# \quad P_{k|n} = P_{k|k} + C_k ( P_{k+1|n} - P_{k+1|k} ) C_k^T $$
#
# with $C_k = P_{k|k} F^T_{k+1} P_{k+1|k}^{-1}$.
def rts_smooth(kf, n=None):
"""Taking an instance of KalmanFilter, return a (states, state_covariances)
pair. states is a NxSTATE_DIM array of smoothed state estimates and
state_covariances is a NxSTATE_DIMxSTATE_DIM array of state estimate
covariances.
If N is not specified, it defaults to the last index with an observation.
"""
state_dim = kf.state_dimension
n_states = kf.last_observation_idx + 1
states = np.nan * np.ones((n_states, state_dim))
state_covariances = np.nan * np.ones((n_states, state_dim, state_dim))
# Special case: asked for no states
if n_states == 0:
return states, state_covariances
# Initialise with final posterior estimate
states[-1, ...] = kf.posterior_state_means[-1]
state_covariances[-1, ...] = kf.posterior_state_covariances[-1]
# Work backwards from final state
for k in range(n_states-2, -1, -1):
F = kf.process_matrices[k+1]
C = kf.posterior_state_covariances[k].dot(F.T).dot(
np.linalg.inv(kf.prior_state_covariances[k+1])
)
# Calculate smoothed state and covariance
states[k, ...] = kf.posterior_state_means[k] + \
C.dot(states[k+1, ...] - kf.prior_state_means[k+1])
state_covariances[k, ...] = kf.posterior_state_covariances[k] + C.dot(
state_covariances[k+1, ...] - kf.prior_state_covariances[k+1]
).dot(C.T)
return states, state_covariances
def main():
opts = docopt.docopt(__doc__)
input_file = opts['<observations>']
obs_dict = np.load(input_file)
observations = obs_dict['observations']
camera_at = obs_dict['camera_at']
observations_by_frame = dict(
(k, np.array(list(v))[:, 1:7])
for k, v in itertools.groupby(observations, lambda o: o[0])
)
frames_with_observations = np.asarray(list(observations_by_frame.keys())).astype(np.int)
min_frame = int(observations[:, 0].min())
max_frame = int(observations[:, 0].max())
if opts['--max-frame'] is not None:
max_frame = int(opts['--max-frame'])
live_filters, dead_filters = track(observations_by_frame,
min_frame=min_frame, max_frame=max_frame,
camera_at=camera_at,
max_gap=5, merge_radius=1.0)
print('After tracking, {} live filters and {} dead ones'.format(
len(live_filters), len(dead_filters)))
# Turn filters into tracks
tracks = list()
for f in dead_filters.union(live_filters):
t_observations = f.observations[:f.last_observation_idx]
t_state_estimates, t_state_covariances = rts_smooth(f)
# Only consider tracks with many observations and long length
if f.observation_count > 5 and t_state_estimates.shape[0] > 25:
tracks.append(Track(state_estimates=np.vstack(t_state_estimates),
state_covariances=np.vstack(t_state_covariances),
observations=t_observations, birth_time=f.birth_time))
print('Obtained {} tracks'.format(len(tracks)))
# ## Writing the output
#
# We write the tracks to a MATLAB file. We firstly stack all the track
# states up into a single matrix called "smoothed" whose first column is
# frome index, second is track if and remaining are the state. For
# convenience of subsequent visualisation we then sort it by frame index.
# Build up smoothed matrix firstly as a list and then stack it
smoothed = []
for t_idx, t in enumerate(tracks):
states = t.state_estimates
frame_idxs = (t.birth_time + np.arange(states.shape[0])).reshape((-1, 1))
track_idxs = np.ones((states.shape[0], 1)) * t_idx
smoothed.append(np.hstack((frame_idxs, track_idxs, states)))
smoothed = np.vstack(smoothed)
# Sort smoothed tracks by frame index
smoothed = smoothed[np.argsort(smoothed[:, 0]), ...]
# Save
sio.savemat(opts['<output>'], dict(smoothed=smoothed))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment