-
-
Save narykov/4a2e8af25c14e07921774073cc179e4f to your computer and use it in GitHub Desktop.
Current smoother
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import copy | |
from functools import partial | |
import numpy as np | |
from .base import Smoother | |
from ..base import Property | |
from ..models.base import LinearModel | |
from ..types.multihypothesis import MultipleHypothesis | |
from ..types.prediction import GaussianStatePrediction | |
from ..types.update import GaussianStateUpdate | |
from ..models.transition.base import TransitionModel | |
from ..models.transition.linear import LinearGaussianTransitionModel, LinearTransitionModel | |
from ..functions import gauss2sigma, unscented_transform | |
class KalmanSmoother(Smoother): | |
r""" | |
The linear-Gaussian or Rauch-Tung-Striebel smoother, colloquially the Kalman smoother [1]_. The | |
transition model is therefore linear-Gaussian. No control model is currently implemented. | |
TODO: Include a control model | |
The smooth function undertakes the backward algorithm on a :class:`~.Track()` object. This is | |
done by starting at the final index in the track, :math:`K` and proceeding from | |
:math:`K \rightarrow 1` via: | |
.. math:: | |
\mathbf{x}_{k|k-1} &= F_{k} \mathbf{x}_{k-1} | |
P_{k|k-1} &= F_{k} P_{k-1} F_{k}^T + Q_{k} | |
G_k &= P_{k-1} F_{k}^T P_{k|k-1}^{-1} | |
\mathbf{x}_{k-1}^s &= \mathbf{x}_{k-1} + G_k (\mathbf{x}_{k}^s - \mathbf{x}_{k|k-1}) | |
P_{k-1}^s &= P_{k-1} + G_k (P_{k}^s - P_{k|k-1}) G_k^T | |
where :math:`\mathbf{x}_{K}^s = \mathbf{x}_{K}` and :math:`P_K^s = P_K`. | |
The predicted state vector and covariance are retrieved from the Track via predicted state or | |
updated state via the links therein. Note that this means that the first two equations are not | |
calculated, the results merely retrieved. This smoother is therefore strictly Kalman only in | |
the backward portion. The prediction might have come by any number of means. If present, the | |
transition model (providing :math:`F` and :math:`Q`) in the prediction is used. This allows for | |
a dynamic transition model (i.e. one that changes with :math:`k`). Otherwise, the (static) | |
transition model is used, defined on smoother initialisation. | |
References | |
.. [1] Särkä S. 2013, Bayesian filtering and smoothing, Cambridge University Press | |
""" | |
transition_model: LinearGaussianTransitionModel = Property( | |
doc="The transition model. The :meth:`smooth` function will initially look for a " | |
"transition model in the prediction. If that is not found then this one is used.") | |
def _prediction(self, state): | |
""" Return the predicted state, either from the prediction directly, or from the attached | |
hypothesis if the queried state is an Update. If not a :class:`~.GaussianStatePrediction` | |
or :class:`~.GaussianStateUpdate` a :class:`~.TypeError` is thrown. | |
Parameters | |
---------- | |
state : :class:`~.GaussianStatePrediction` or :class:`~.GaussianStateUpdate` | |
Returns | |
------- | |
: :class:`~.GaussianStatePrediction` | |
The prediction associated with the prediction (i.e. itself), or the prediction from the | |
hypothesis used to generate an update. | |
""" | |
if isinstance(state, GaussianStatePrediction): | |
return state | |
elif isinstance(state, GaussianStateUpdate): | |
if isinstance(state.hypothesis, MultipleHypothesis): | |
predictions = {hypothesis.prediction for hypothesis in state.hypothesis} | |
if len(predictions) == 1: | |
# One predictions, this is fine to use. | |
return predictions.pop() | |
else: | |
# Multiple predictions, so can't process this. | |
raise ValueError( | |
"Track has MultipleHypothesis updates with multiple predictions.") | |
else: | |
return state.hypothesis.prediction | |
else: | |
raise TypeError("States must be GaussianStatePredictions or GaussianStateUpdates.") | |
def _transition_model(self, prediction): | |
""" If it exists, return the transition model from the prediction associated with input | |
state. If that doesn't exist then use the (static) transition model defined by the | |
smoother. | |
Parameters | |
---------- | |
prediction : :class:`~.GaussianStatePrediction` or :class:`~.GaussianStateUpdate` | |
Returns | |
------- | |
: :class:`~.TransitionModel` | |
The transition model to be associated with state | |
""" | |
# Is there a transition model linked to the prediction? | |
transition_model = getattr(prediction, "transition_model", None) | |
if transition_model is None: | |
transition_model = self.transition_model | |
return transition_model | |
def _transition_matrix(self, state, transition_model, **kwargs): | |
""" Return the transition matrix | |
Parameters | |
---------- | |
state : :class:`~.State` | |
The input state (to check for a linked prediction) | |
transition_model : :class:`~.TransitionModel` | |
The transition model to be applied to state | |
**kwargs | |
These are passed to the :meth:`matrix()` function | |
Returns | |
------- | |
: :class:`numpy.ndarray` | |
The transition matrix | |
""" | |
return transition_model.matrix(**kwargs) | |
def _smooth_gain(self, state, prediction, **kwargs): | |
"""Calculate the smoothing gain | |
Parameters | |
---------- | |
state : :class:`~.State` | |
The input state | |
prediction : :class:`~.GaussianStatePrediction` | |
The prediction (from the subsequent state) | |
Returns | |
------- | |
: Matrix | |
The smoothing gain | |
""" | |
return state.covar \ | |
@ self._transition_matrix(state, self._transition_model(prediction), **kwargs).T \ | |
@ np.linalg.inv(prediction.covar) | |
def smooth(self, track, **kwargs): | |
""" | |
Perform the backward recursion to smooth the track. | |
Parameters | |
---------- | |
track : :class:`~.Track` | |
The input track. | |
Returns | |
------- | |
: :class:`~.Track` | |
Smoothed track | |
""" | |
try: | |
self._prediction(track[0]) | |
start = 0 | |
except (ValueError, TypeError): | |
start = 1 | |
subsq_state = track[-1] | |
smoothed_states = [subsq_state] | |
for state in reversed(track[start:-1]): | |
# Delta t | |
time_interval = subsq_state.timestamp - state.timestamp | |
# Retrieve the prediction from the subsequent (k+1th) timestep accessed previously | |
prediction = self._prediction(subsq_state) | |
# The smoothing gain, mean and covariance | |
ksmooth_gain = self._smooth_gain( | |
state, prediction, time_interval=time_interval, **kwargs) | |
smooth_mean = state.state_vector + ksmooth_gain @ (subsq_state.state_vector - | |
prediction.state_vector) | |
smooth_covar = state.covar + \ | |
ksmooth_gain @ (subsq_state.covar - prediction.covar) @ ksmooth_gain.T | |
# Create a new type called SmoothedState? | |
subsq_state = type(state).from_state(state, smooth_mean, smooth_covar) | |
smoothed_states.insert(0, subsq_state) | |
if start == 1: | |
smoothed_states.insert(0, track[0]) | |
# Deep copy existing track, but avoid copying original states, as this would be super | |
# expensive. This works by informing deepcopy that the smoothed states are the | |
# replacement object for the original track states. | |
smoothed_track = copy.deepcopy(track, {id(track.states): smoothed_states}) | |
return smoothed_track | |
class ExtendedKalmanSmoother(KalmanSmoother): | |
r""" The extended version of the Kalman smoother. The equations are modified slightly, | |
analogously to the extended Kalman filter, | |
.. math:: | |
\mathbf{x}_{k|k-1} &= f_{k} (\mathbf{x}_{k-1}) | |
F_k &\approx J_f (\mathbf{x}_{k-1}) | |
where :math:`J_f (\mathbf{x}_{k-1})` is the Jacobian matrix evaluated at | |
:math:`\mathbf{x}_{k-1}`. The rest of the calculation proceeds as with the Kalman smoother. | |
In fact the first equation isn't calculated -- it's presumed to have been undertaken by a | |
filter when building the track; similarly for the predicted covariance. In practice, the only | |
difference between this and the Kalman smoother is in the use of the linearised transition | |
matrix to calculate the smoothing gain. | |
""" | |
transition_model: TransitionModel = Property(doc="The transition model to be used.") | |
def _transition_matrix(self, state, transition_model, linearisation_point=None, **kwargs): | |
r"""Returns the transition matrix, a matrix if the model is linear, or | |
approximated as Jacobian otherwise. | |
Parameters | |
---------- | |
state : :class:`~.State` | |
:math:`\mathbf{x}_{k-1}` | |
transition_model : :class:`~.TransitionModel` | |
transition model to be applied to state | |
linearisation_point : :class:`~State`, optional | |
State to linearise over. Default `None` where state will be used. | |
**kwargs : various, optional | |
These are passed to :meth:`~.TransitionModel.matrix` or | |
:meth:`~.TransitionModel.jacobian` | |
Returns | |
------- | |
: :class:`numpy.ndarray` | |
The transition matrix, :math:`F_k`, if linear (i.e. | |
:meth:`TransitionModel.matrix` exists, or | |
:meth:`~.TransitionModel.jacobian` if not) | |
""" | |
if isinstance(transition_model, LinearModel): | |
return transition_model.matrix(**kwargs) | |
else: | |
if linearisation_point is None: | |
linearisation_point = state | |
return transition_model.jacobian(linearisation_point, **kwargs) | |
class UnscentedKalmanSmoother(KalmanSmoother): | |
r"""The unscented version of the Kalman filter. As with the parent version of the Kalman | |
smoother, the mean and covariance of the prediction are retrieved from the track. The | |
unscented transform is used to calculate the smoothing gain. | |
""" | |
transition_model: TransitionModel = Property(doc="The transition model to be used.") | |
alpha: float = Property( | |
default=0.5, | |
doc="Primary sigma point spread scaling parameter. Default is 0.5.") | |
beta: float = Property( | |
default=2, | |
doc="Used to incorporate prior knowledge of the distribution. If the " | |
"true distribution is Gaussian, the value of 2 is optimal. " | |
"Default is 2") | |
kappa: float = Property( | |
default=0, | |
doc="Secondary spread scaling parameter. Default is calculated as " | |
"3-Ns") | |
def _smooth_gain(self, state, prediction, time_interval, **kwargs): | |
"""Calculate the smoothing gain | |
Parameters | |
---------- | |
state : :class:`~.State` | |
The input state | |
prediction : :class:`~.GaussianStatePrediction` | |
The prediction from the subsequent timestep | |
Returns | |
------- | |
: Matrix | |
The smoothing gain | |
""" | |
# This ensures that the time interval is correctly applied. | |
transition_function = partial( | |
self._transition_model(prediction).function, | |
time_interval=time_interval) | |
# Get the sigma points from the mean and covariance. | |
sigma_point_states, mean_weights, covar_weights = gauss2sigma( | |
state, self.alpha, self.beta, self.kappa) | |
# Use the unscented transform to return the cross-covariance | |
_, _, cross_covar, _, _, _ = unscented_transform( | |
sigma_point_states, mean_weights, covar_weights, | |
transition_function) | |
return cross_covar @ np.linalg.inv(prediction.covar) | |
from ..types.track import Track | |
from ..updater.kalman import KalmanUpdater, UnscentedKalmanUpdater | |
from ..functions import slr_definition | |
from ..models.measurement.linear import GeneralLinearGaussian | |
from ..types.hypothesis import SingleHypothesis | |
from ..predictor.kalman import AugmentedUnscentedKalmanPredictor, AugmentedKalmanPredictor | |
from ..models.measurement.base import MeasurementModel | |
class IPLSKalmanSmoother(UnscentedKalmanSmoother): | |
r"""The unscented implementation of the IPLS algorithm.""" | |
transition_model: TransitionModel = Property(doc="The transition model to be used.") | |
measurement_model: MeasurementModel = Property(default=None, doc="The measurement model to be used.") | |
alpha: float = Property( | |
default=0.5, | |
doc="Primary sigma point spread scaling parameter. Default is 0.5.") | |
beta: float = Property( | |
default=2, | |
doc="Used to incorporate prior knowledge of the distribution. If the " | |
"true distribution is Gaussian, the value of 2 is optimal. " | |
"Default is 2") | |
kappa: float = Property( | |
default=0, | |
doc="Secondary spread scaling parameter. Default is calculated as " | |
"3-Ns") | |
n_iterations: int = Property( | |
default=5, | |
doc="Number of smoothing iterations.") | |
def state_prediction_no_noise(self, state, transition_model, timestamp): | |
return AugmentedUnscentedKalmanPredictor( | |
beta=self.beta, kappa=self.kappa, transition_model=transition_model | |
).predict( | |
prior=state, | |
timestamp=timestamp | |
) | |
def measurement_prediction_no_noise(self, state, measurement_model): | |
return UnscentedKalmanUpdater( | |
beta=self.beta, kappa=self.kappa | |
).predict_measurement( | |
predicted_state=state, | |
measurement_model=measurement_model, | |
measurement_noise=False | |
) | |
def smooth(self, track): | |
""" | |
Execute the IPLS algorithm. | |
Parameters | |
---------- | |
track : :class:`~.Track` | |
The input track. | |
Returns | |
------- | |
: :class:`~.Track` | |
Smoothed track | |
""" | |
# Return the original track if 0 iterations are requested | |
global previous_state | |
if self.n_iterations == 0: | |
return track | |
# A filtered track is the input to this smoother. | |
# measurement_model = track[-1].hypothesis.measurement.measurement_model | |
smoothed_tracks = [] | |
while True: | |
# we have no test of convergence, but limited the number of iterations | |
if len(smoothed_tracks) >= self.n_iterations: | |
# warnings.warn("IPLS reached pre-specified number of iterations.") | |
break | |
print(f'IPLS iteration {len(smoothed_tracks) + 1} out of {self.n_iterations}') | |
if not smoothed_tracks: | |
# initialising by performing sigma-point smoothing via the UKF smoother | |
smoothed_track = UnscentedKalmanSmoother(transition_model=self.transition_model, | |
alpha=self.alpha, | |
beta=self.beta, | |
kappa=self.kappa).smooth(track) | |
smoothed_tracks.append(smoothed_track) | |
continue | |
track_forward = Track(track[0]) # starting the new forward track to be | |
print(len(smoothed_track)) | |
for i, current_state in enumerate(smoothed_track): | |
if i == 0: | |
previous_state = track_forward[0] | |
continue | |
""" Compute SLR parameters. """ | |
#TODO: check if any models are linear and skip linearisation | |
from stonesoup.types.prediction import Prediction | |
if issubclass(type(track[i]), Prediction): | |
transition_model = track[i].transition_model | |
else: | |
transition_model = track[i].hypothesis.prediction.transition_model | |
if transition_model is None: | |
transition_model = self.transition_model | |
trans_fun = partial( | |
self.state_prediction_no_noise, | |
transition_model=transition_model, | |
timestamp=current_state.timestamp | |
) | |
print(i) | |
f_matrix, a_vector, lambda_cov_matrix = slr_definition(previous_state, trans_fun, force_symmetry=True) | |
if not isinstance(current_state, Prediction): | |
measurement_model = current_state.hypothesis.measurement.measurement_model | |
if measurement_model is None: | |
measurement_model = self.measurement_model | |
meas_fun = partial( | |
self.measurement_prediction_no_noise, | |
measurement_model=measurement_model | |
) | |
h_matrix, b_vector, omega_cov_matrix = slr_definition(current_state, meas_fun, force_symmetry=True) | |
"Perform linear time update" | |
time_interval = current_state.timestamp-previous_state.timestamp | |
if not isinstance(current_state, Prediction): | |
transition_model = track[i].hypothesis.prediction.transition_model | |
else: | |
transition_model = track[i].transition_model | |
if transition_model is None: | |
transition_model = self.transition_model | |
q_matrix = transition_model.covar(time_interval=time_interval) | |
transition_model_linearised = LinearTransitionModel( | |
transition_matrix=f_matrix, | |
bias_value=a_vector, | |
noise_covar=lambda_cov_matrix+q_matrix | |
) | |
prediction_linear = AugmentedKalmanPredictor(transition_model_linearised).predict( | |
track_forward[-1], timestamp=current_state.timestamp | |
) | |
"Perform linear data update" | |
if not isinstance(current_state, Prediction): | |
r_matrix = measurement_model.covar() | |
measurement_model_linearized = GeneralLinearGaussian( | |
ndim_state=measurement_model.ndim_state, | |
mapping=measurement_model.mapping, | |
meas_matrix=h_matrix, | |
bias_value=b_vector, | |
noise_covar=omega_cov_matrix+r_matrix | |
) | |
# Get the actual measurement plus its prediction for the above model using the predicted pdf | |
measurement = current_state.hypothesis.measurement | |
measurement.measurement_model = measurement_model_linearized | |
hypothesis = SingleHypothesis(prediction=prediction_linear, measurement=measurement) | |
update_linear = KalmanUpdater().update(hypothesis) | |
# restores the model (ensures visualisation is OK) | |
update_linear.hypothesis.measurement.measurement_model = measurement_model | |
else: | |
update_linear = prediction_linear | |
# update_linear.hypothesis.prediction.transition_model = transition_model | |
# append the track with an update (that contains hypothesis and info needed for the backwards go) | |
track_forward.append(update_linear) | |
previous_state = current_state | |
smoothed_track = KalmanSmoother(transition_model=None).smooth(track_forward) | |
smoothed_tracks.append(smoothed_track) | |
return smoothed_tracks[-1] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment