Skip to content

Instantly share code, notes, and snippets.

@tokahuke
Created January 27, 2023 01:00
Show Gist options
  • Save tokahuke/4c61a9cdf54bee9ec0a29f9485869f7a to your computer and use it in GitHub Desktop.
Save tokahuke/4c61a9cdf54bee9ec0a29f9485869f7a to your computer and use it in GitHub Desktop.
Adaptative moving average using a Hidden Auto-Regressive process
##
# Copyright 2023 Pedro Bittencourt Arruda
# This work is licensed under a Creative Commons Attribution 4.0 International License.
#
#
# # TL; DR:
#
# Use the function `smooth` to put in a list of numbers and get the smoothed list of
# numbers as the returned value. If you care about confidence intervals `smooth_full`
# will als give you the standard deviations.
#
# Still no sure how this works? Run this script with Python to get a picture of what is
# going on.
#
#
# # What is this all about?
#
# This is an EM implementation for an adaptative smoother based on the following model:
#
# y[n] = x[n] + v[n]
# x[n+1] = x[n] + u[n]
#
# Basically, the simplest Kalman Filter-type linear system you could hope for. What
# follows is the dump of a demonstration of the algorithm (untranslated!), but the gist
# is:
#
# - Use the Forward-Backward algorithm to calculate the latent (hidden) states x[i|n] and
# their variances.
# - Having the hidden state calculated it's relatively straightforward to use the EM
# algorithm to create an iterative calibration procedure for the parameters.
#
#
# ========== BEGIN MATHS INFODUMP ==========
# y[n] = x[n] + v[n]
# x[n+1] = x[n] + u[n]
#
# x[n+1|n] = x[n|n]
# v[n+1|n] = v[n|n] + vu
#
# x[n|n] = x[n|n-1] + v[n|n-1] / (v[n|n-1] + vv) * (yn - x[n|n-1])
# v[n|n] = v[n+1|n] - v[n|n-1] / (v[n|n-1] + vv) * v[n|n-1]
# = v[n|n-1] * vv / (v[n|n-1] + vv)
# = v[n|n-1] || vv
#
#
# x[n|n] = x[n-1|n-1] + v[n|n-1] / (v[n|n-1] + vv) * (yn - x[n-1|n-1])
# v[n|n] = (v[n-1|n-1] + vu) || vv
#
# Se eu quiser amostrar:
#
# Amostrei x[n].
#
# E[x[n-1]] = x[n-1|n-1] + v[n-1|n-1] / (v[n-1|n-1] + vu) * (x[n] - x[n-1|n-1])
# var[x[n-1]] = v[n-1|n-1] || vu
#
# Mas se eu tenho variância v[n|N] e média x[n|N]...
#
# x[n-1|N] = x[n-1|n-1] + v[n-1|n-1] / (v[n-1|n-1] + vu) * (x[n|N] - x[n-1|n-1])
# v[n-1|N] = v[n-1|n-1] || vu + [v[n-1|n-1] / (v[n-1|n-1] + vu)] ^ 2 v[n|N]
# = v[n-1|n-1] || vu + (v[n-1|n-1] || vu) ^ 2 * v[n|N] / vu ^ 2
#
# Agora, para a diferença X[n|N] - X[n-1|N], nós procedemos da mesma maneira.
#
# E[diff] = vu / (v[n-1|n-1] + vu) * (x[n] - x[n-1|n-1])
# var[diff] = v[n-1|n-1] || vu
#
# Mas se eu tenho variâncias v[n|N] e médias x[n|N]...
#
# diff[n-1|N] = vu / (v[n-1|n-1] + vu) * (x[n|N] - x[n-1|n-1])
# var_diff[n-1|N] = v[n-1|n-1] || vu + [vu / (v[n-1|n-1] + vu)] ^ 2 v[n|N]
# = v[n-1|n-1] || vu + (v[n-1|n-1] || vu) ^ 2 * v[n|N] / v[n-1|n-1] ^ 2
#
# E, com isso, conseguimos descobrir E[diff^2[n-1|N]], que é nossa quantidade de interesse
# no EM.
#
# ========== END MATHS INFODUMP ==========
##
from dataclasses import dataclass
from typing import List, Generator, Tuple, Iterable
def par(a, b):
return a * b / (a + b)
def avg(a, b, wa, wb):
return (wa * a + wb * b) / (wa + wb)
BIG_VAR = float(1e12)
@dataclass
class HiddenFluctuation(object):
"""
Models the one-state, one-output Kalman filter.
"""
var_state: float
"""Variance of the state. How much the "trend" fluctuates from sample to sample."""
var_out: float
"""
Variance of the output. How much the output deviates from the trend in each sample.
"""
def _smooth_forward(
self, it: Iterable[float]
) -> Generator[Tuple[float, float], None, None]:
"""Calculates the forward step of the Forward-Backward algorithm."""
state = 0
var = BIG_VAR
for sample in it:
state = avg(state, sample, self.var_out, var + self.var_state)
var = par(var + self.var_state, self.var_out)
yield state, var
def _smooth_backward(
self, forward: List[Tuple[float, float]]
) -> Generator[Tuple[float, float], None, None]:
"""Calculates the backward step of the Forward-Backward algorithm."""
state, var = forward[-1]
yield state, var
for causal_state, causal_var in forward[-2::-1]:
state = avg(causal_state, state, self.var_state, causal_var)
var_simple = par(causal_var, self.var_state)
var = var_simple + var_simple**2 * var / self.var_state**2
yield state, var
def smooth(self, samples: Iterable[float]) -> List[Tuple[float, float]]:
forward = list(self._smooth_forward(samples))
backward = list(self._smooth_backward(forward))[::-1]
return backward
def _state_noises(
self, forward: List[Tuple[float, float]]
) -> Generator[Tuple[float, float], None, None]:
state, var = forward[-1]
for causal_state, causal_var in forward[1::-1]:
state = avg(causal_state, state, self.var_state, causal_var)
var_simple = par(causal_var, self.var_state)
var = var_simple + var_simple**2 * var / self.var_state**2
diff = (
self.var_state / (causal_var + self.var_state) * (state - causal_state)
)
var_diff = var_simple + var_simple**2 * var / causal_var**2
yield diff, var_diff
def em_step(self, samples: List[float]):
"""
Uses a list of samples to create another `HiddenFluctuation` object which is
strictly better at explaining the samples than the current one. Run this function
a bunch of times and you get "somewhere pretty good" (not necessarily the best).
"""
forward = list(self._smooth_forward(samples))
state_noises = list(self._state_noises(forward))[::-1]
new_var_state = sum(
diff**2 + var_diff for diff, var_diff in state_noises
) / len(state_noises)
new_var_output = sum(
(t[0] - sample) ** 2 + t[1] for t, sample in zip(forward, samples)
) / len(forward)
return HiddenFluctuation(new_var_state, new_var_output)
@classmethod
def learn(cls, samples: List[float]):
"""Learns a `HiddenFluctuation` from a sample"""
learned = HiddenFluctuation(1.0, 1.0) # ... because!
for _i in range(100):
learned = learned.em_step(samples)
return learned
def smooth(samples: List[float]) -> List[float]:
"""Smooths a list of samples"""
model = HiddenFluctuation.learn(samples)
smoothed = model.smooth(samples)
return [mean for mean, stddev in smoothed]
def smooth_full(samples: List[float]) -> List[Tuple[float, float]]:
"""Smooths a list of samples. Returns expected values and standard deviations."""
model = HiddenFluctuation.learn(samples)
smoothed = model.smooth(samples)
return smoothed
# Now, let's test it!
if __name__ == "__main__":
from random import gauss
from matplotlib import pyplot as plt
def gen_hidden_fluctuation(var_state, var_output, n):
state = 0
for _ in range(n):
output = gauss(state, var_output)
yield output
state = gauss(state, var_state)
sample = list(gen_hidden_fluctuation(1.0, 3.0, 365))
learned = HiddenFluctuation.learn(sample)
print(learned)
smoothed = [mean for mean, _ in learned.smooth(sample)]
print(smoothed)
plt.plot(sample, ".", smoothed, "r")
plt.show()
@tokahuke
Copy link
Author

This is a script I created a good while ago with the objective of tracking my weight. Don't ask me about the details; by now, I just know how the big picture works! If you are interested, there is a maths infodump in the file; if not, just from ama import smooth and never worry again in your life about calibrating moving averages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment