Created
January 27, 2023 01:00
-
-
Save tokahuke/4c61a9cdf54bee9ec0a29f9485869f7a to your computer and use it in GitHub Desktop.
Adaptative moving average using a Hidden Auto-Regressive process
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## | |
# Copyright 2023 Pedro Bittencourt Arruda | |
# This work is licensed under a Creative Commons Attribution 4.0 International License. | |
# | |
# | |
# # TL; DR: | |
# | |
# Use the function `smooth` to put in a list of numbers and get the smoothed list of | |
# numbers as the returned value. If you care about confidence intervals `smooth_full` | |
# will als give you the standard deviations. | |
# | |
# Still no sure how this works? Run this script with Python to get a picture of what is | |
# going on. | |
# | |
# | |
# # What is this all about? | |
# | |
# This is an EM implementation for an adaptative smoother based on the following model: | |
# | |
# y[n] = x[n] + v[n] | |
# x[n+1] = x[n] + u[n] | |
# | |
# Basically, the simplest Kalman Filter-type linear system you could hope for. What | |
# follows is the dump of a demonstration of the algorithm (untranslated!), but the gist | |
# is: | |
# | |
# - Use the Forward-Backward algorithm to calculate the latent (hidden) states x[i|n] and | |
# their variances. | |
# - Having the hidden state calculated it's relatively straightforward to use the EM | |
# algorithm to create an iterative calibration procedure for the parameters. | |
# | |
# | |
# ========== BEGIN MATHS INFODUMP ========== | |
# y[n] = x[n] + v[n] | |
# x[n+1] = x[n] + u[n] | |
# | |
# x[n+1|n] = x[n|n] | |
# v[n+1|n] = v[n|n] + vu | |
# | |
# x[n|n] = x[n|n-1] + v[n|n-1] / (v[n|n-1] + vv) * (yn - x[n|n-1]) | |
# v[n|n] = v[n+1|n] - v[n|n-1] / (v[n|n-1] + vv) * v[n|n-1] | |
# = v[n|n-1] * vv / (v[n|n-1] + vv) | |
# = v[n|n-1] || vv | |
# | |
# | |
# x[n|n] = x[n-1|n-1] + v[n|n-1] / (v[n|n-1] + vv) * (yn - x[n-1|n-1]) | |
# v[n|n] = (v[n-1|n-1] + vu) || vv | |
# | |
# Se eu quiser amostrar: | |
# | |
# Amostrei x[n]. | |
# | |
# E[x[n-1]] = x[n-1|n-1] + v[n-1|n-1] / (v[n-1|n-1] + vu) * (x[n] - x[n-1|n-1]) | |
# var[x[n-1]] = v[n-1|n-1] || vu | |
# | |
# Mas se eu tenho variância v[n|N] e média x[n|N]... | |
# | |
# x[n-1|N] = x[n-1|n-1] + v[n-1|n-1] / (v[n-1|n-1] + vu) * (x[n|N] - x[n-1|n-1]) | |
# v[n-1|N] = v[n-1|n-1] || vu + [v[n-1|n-1] / (v[n-1|n-1] + vu)] ^ 2 v[n|N] | |
# = v[n-1|n-1] || vu + (v[n-1|n-1] || vu) ^ 2 * v[n|N] / vu ^ 2 | |
# | |
# Agora, para a diferença X[n|N] - X[n-1|N], nós procedemos da mesma maneira. | |
# | |
# E[diff] = vu / (v[n-1|n-1] + vu) * (x[n] - x[n-1|n-1]) | |
# var[diff] = v[n-1|n-1] || vu | |
# | |
# Mas se eu tenho variâncias v[n|N] e médias x[n|N]... | |
# | |
# diff[n-1|N] = vu / (v[n-1|n-1] + vu) * (x[n|N] - x[n-1|n-1]) | |
# var_diff[n-1|N] = v[n-1|n-1] || vu + [vu / (v[n-1|n-1] + vu)] ^ 2 v[n|N] | |
# = v[n-1|n-1] || vu + (v[n-1|n-1] || vu) ^ 2 * v[n|N] / v[n-1|n-1] ^ 2 | |
# | |
# E, com isso, conseguimos descobrir E[diff^2[n-1|N]], que é nossa quantidade de interesse | |
# no EM. | |
# | |
# ========== END MATHS INFODUMP ========== | |
## | |
from dataclasses import dataclass | |
from typing import List, Generator, Tuple, Iterable | |
def par(a, b): | |
return a * b / (a + b) | |
def avg(a, b, wa, wb): | |
return (wa * a + wb * b) / (wa + wb) | |
BIG_VAR = float(1e12) | |
@dataclass | |
class HiddenFluctuation(object): | |
""" | |
Models the one-state, one-output Kalman filter. | |
""" | |
var_state: float | |
"""Variance of the state. How much the "trend" fluctuates from sample to sample.""" | |
var_out: float | |
""" | |
Variance of the output. How much the output deviates from the trend in each sample. | |
""" | |
def _smooth_forward( | |
self, it: Iterable[float] | |
) -> Generator[Tuple[float, float], None, None]: | |
"""Calculates the forward step of the Forward-Backward algorithm.""" | |
state = 0 | |
var = BIG_VAR | |
for sample in it: | |
state = avg(state, sample, self.var_out, var + self.var_state) | |
var = par(var + self.var_state, self.var_out) | |
yield state, var | |
def _smooth_backward( | |
self, forward: List[Tuple[float, float]] | |
) -> Generator[Tuple[float, float], None, None]: | |
"""Calculates the backward step of the Forward-Backward algorithm.""" | |
state, var = forward[-1] | |
yield state, var | |
for causal_state, causal_var in forward[-2::-1]: | |
state = avg(causal_state, state, self.var_state, causal_var) | |
var_simple = par(causal_var, self.var_state) | |
var = var_simple + var_simple**2 * var / self.var_state**2 | |
yield state, var | |
def smooth(self, samples: Iterable[float]) -> List[Tuple[float, float]]: | |
forward = list(self._smooth_forward(samples)) | |
backward = list(self._smooth_backward(forward))[::-1] | |
return backward | |
def _state_noises( | |
self, forward: List[Tuple[float, float]] | |
) -> Generator[Tuple[float, float], None, None]: | |
state, var = forward[-1] | |
for causal_state, causal_var in forward[1::-1]: | |
state = avg(causal_state, state, self.var_state, causal_var) | |
var_simple = par(causal_var, self.var_state) | |
var = var_simple + var_simple**2 * var / self.var_state**2 | |
diff = ( | |
self.var_state / (causal_var + self.var_state) * (state - causal_state) | |
) | |
var_diff = var_simple + var_simple**2 * var / causal_var**2 | |
yield diff, var_diff | |
def em_step(self, samples: List[float]): | |
""" | |
Uses a list of samples to create another `HiddenFluctuation` object which is | |
strictly better at explaining the samples than the current one. Run this function | |
a bunch of times and you get "somewhere pretty good" (not necessarily the best). | |
""" | |
forward = list(self._smooth_forward(samples)) | |
state_noises = list(self._state_noises(forward))[::-1] | |
new_var_state = sum( | |
diff**2 + var_diff for diff, var_diff in state_noises | |
) / len(state_noises) | |
new_var_output = sum( | |
(t[0] - sample) ** 2 + t[1] for t, sample in zip(forward, samples) | |
) / len(forward) | |
return HiddenFluctuation(new_var_state, new_var_output) | |
@classmethod | |
def learn(cls, samples: List[float]): | |
"""Learns a `HiddenFluctuation` from a sample""" | |
learned = HiddenFluctuation(1.0, 1.0) # ... because! | |
for _i in range(100): | |
learned = learned.em_step(samples) | |
return learned | |
def smooth(samples: List[float]) -> List[float]: | |
"""Smooths a list of samples""" | |
model = HiddenFluctuation.learn(samples) | |
smoothed = model.smooth(samples) | |
return [mean for mean, stddev in smoothed] | |
def smooth_full(samples: List[float]) -> List[Tuple[float, float]]: | |
"""Smooths a list of samples. Returns expected values and standard deviations.""" | |
model = HiddenFluctuation.learn(samples) | |
smoothed = model.smooth(samples) | |
return smoothed | |
# Now, let's test it! | |
if __name__ == "__main__": | |
from random import gauss | |
from matplotlib import pyplot as plt | |
def gen_hidden_fluctuation(var_state, var_output, n): | |
state = 0 | |
for _ in range(n): | |
output = gauss(state, var_output) | |
yield output | |
state = gauss(state, var_state) | |
sample = list(gen_hidden_fluctuation(1.0, 3.0, 365)) | |
learned = HiddenFluctuation.learn(sample) | |
print(learned) | |
smoothed = [mean for mean, _ in learned.smooth(sample)] | |
print(smoothed) | |
plt.plot(sample, ".", smoothed, "r") | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is a script I created a good while ago with the objective of tracking my weight. Don't ask me about the details; by now, I just know how the big picture works! If you are interested, there is a maths infodump in the file; if not, just
from ama import smooth
and never worry again in your life about calibrating moving averages.