Skip to content

Instantly share code, notes, and snippets.

@MCRE-BE
Last active November 3, 2023 11:02
Show Gist options
  • Save MCRE-BE/f40daf732886d091b0886e071abf9e75 to your computer and use it in GitHub Desktop.
Save MCRE-BE/f40daf732886d091b0886e071abf9e75 to your computer and use it in GitHub Desktop.
A simple Fourier Extrapolation implementation in Python
"""Fast Fourier Transform class.
A simple fourier based extrapolation or forecasting class. It is heavily inspired
by two gists or codes listed below.
See Also
--------
https://gist.github.com/tartakynov/83f3cd8f44208a1856ce?permalink_comment_id=2311986
https://github.com/unit8co/darts/blob/ea37dc979190dc2e8949514a6c22d22306597c05/darts/models/forecasting/fft.py#L213
Notes
-----
Requires Python >= 3.11 as we use case match implementation
"""
import dataclasses
from typing import Callable, List, Optional, Self
import numpy as np
import pandas as pd
@dataclasses.dataclass
class FourierExtrapolation:
"""Fourier Fourcasting Model.
Model that performs forecasting or extrapolation of a chosen timeseries using FFT, with frequency filtering
controlled by 'n_harmonics' and the option to detrend controlled by 'trend'.
Note
----
We assume that the Series has no missing values (NaN's, None, etc...) and that the index is a pd.DatetimeIndex.
Both are not checked.
We don't perform any corrections to match the length to a specific frequency.
Parameters
----------
n_harmonics : int, by default=10
The number of frequencies that will be used for forecasting
trend : Optional[str], by default=None
The type of trend we want to use to detrend the series before applying the DFT.
Possible values : {None, 'exp', 'poly', 'lin'}
trend_poly_degree : int, by default=3
The degree of the polynomial that will be used for detrending, if `trend='poly'`.
n_predict : int, by default=365
Number of steps to predict. Can be overwritten in predict.
Examples
--------
Simply fit the class
>>> fft = FourierExtrapolation(n_harmonics=18, trend="poly")
A simple example where we fit the class on a random signal with linear, offset, noise and sinus parts
>>> from datetime import datetime
>>> import pandas as pd
>>> import numpy as np
>>> N = 20
>>> ys = np.arange(N) * 2 + 4 + np.random.randn(N) + 4*np.sin(2*np.pi*np.arange(N)/5)
>>> index = pd.date_range(datetime.today().date(), periods=N, freq="D")
>>> data = pd.Series(ys, index=ubdex)
>>> fft = FourierExtrapolation(n_harmonics=18, trend="poly")
>>> fft = fft.fit(data)
>>> pred = fft.predict(3)
"""
n_harmonics: int = 10
trend: Optional[str] = None
trend_poly_degree: int = 3
n_predict: int = 365
# ... Public API ...
def fit(
self: Self,
X: pd.Series,
y: Optional[pd.Series] = None,
) -> Self:
"""Fit the Discrete Fourier Transformation with the provided X.
Parameters
----------
X : pd.Series
Raises
------
ValueError
In case the provided X is not of a supported dtype.
"""
# Ability to handle pd.Series and Numpy Arrays.
if isinstance(X, pd.Series):
self.X = X.values
self.Y = X.index
elif isinstance(X, np.ndarray):
self.X = X
self.Y = np.arange(0, len(X))
else:
raise ValueError("Not supported import type.")
# Save key elements
self.series = X.copy()
self.size = len(X)
# Detrend
self.t = np.arange(0, self.size)
self._compute_trend()
self.X_detrended = self.X - self.trend_func(self.t)
# perform dft and get the frequency domain
self.fft_values = np.fft.fft(self.X_detrended)
self.freq = np.fft.fftfreq(self.size)
# Get the chosen amount of harmonics
indexes = list(range(self.size))
indexes.sort(key=lambda x: np.absolute(self.freq[x]))
self.indexes = indexes[:1 + self.n_harmonics * 2]
return self
def predict(
self: Self,
n_predict: Optional[int] = None,
) -> pd.Series:
"""Predict the chosen timesteps in the future.
Parameters
----------
n_predict : Optional[int], by default None
Number of steps to predict if we want to overwrite the value provided in __init__
Returns
-------
pd.Series
_description_
"""
# Compute the final trend
if n_predict is not None: self.n_predict = n_predict
t = np.arange(0, self.size + self.n_predict)
trend = [self.trend_func(i) for i in t]
trend = np.array(trend)
# Compute the FFT forecast
yf = self._compute_fourier(
n=self.size,
n_predict=self.n_predict,
indexes=self.indexes,
x_freqdom=self.fft_values,
f=self.freq,
)
# Output to a pd.Series
index = pd.date_range(self.series.index[0], periods=t.size, freq="D")
out = pd.Series(yf + trend, index=index)
return out
def fit_predict(
self: Self,
X: pd.Series,
y: Optional[pd.Series],
) -> pd.Series:
"""Fit and predict in one single step."""
return self.fit(X).predict()
def plot(self: Self) -> None:
"""Plot the provided training data and the extrapolated FFT result."""
# --- Import ---
import matplotlib.pyplot as plt
# --- Script ---
plt.plot(self.series, 'b', label='x', linewidth=1)
plt.plot(
self.predict(self.n_predict),
'r',
label='extrapolation',
)
plt.legend()
plt.show()
# ... Private API ...
@staticmethod
def _compute_fourier(
n: int,
n_predict: int,
indexes: List[int],
x_freqdom: np.ndarray,
f: np.ndarray,
) -> np.ndarray:
"""Inverse the fourier decomposition manually.
Parameters
----------
n : int
Number of samples in the training data
n_predict : int
Number of steps to forecast after the end of the training data
indexes : List[int]
The relevant harmonics to plot
x_freqdom : np.ndarray
The computed FFT by np.fft.fft
f : np.ndarray
The Discrete Fourier Transformation sample frequencies
Returns
-------
nd.array
"""
t = np.arange(0, n + n_predict)
restored_sig = np.zeros(t.size)
for i in indexes:
ampli = np.absolute(x_freqdom[i]) / n # amplitude
phase = np.angle(x_freqdom[i]) # phase
restored_sig += ampli * np.cos(2 * np.pi * f[i] * t + phase)
return restored_sig
# Trend computations
def _compute_trend(self: Self) -> Self:
"""Set the attributes for the trend computation.
Attributes
----------
trend_coef : np.ndarray
trend_func : Callable
"""
y = self.series.astype(int)
y = y.values # pass it as a numpy array
match self.trend:
case "poly":
self.trend_coef = np.polyfit(self.t, y, self.trend_poly_degree)
self.trend_func = self._poly_trend(self.trend_coef)
case "exp":
self.trend_coef = np.polyfit(self.t, np.log(y), 1)
self.trend_func = self._exp_trend
case "lin":
self.trend_coef = np.polyfit(self.t, y, 1)
self.trend_func = self._poly_trend(self.trend_coef)
case _:
self.trend_coef = None
self.trend_func = self._null_trend
return self
def _trend_exp(self: Self, x: int) -> Callable:
"""Helper function for an exponential trend.
See Also
--------
https://stackoverflow.com/a/3433503/20716078
"""
coef = self.trend_coef
return np.exp(coef[1]) * np.exp(coef[0] * x)
@staticmethod
def _poly_trend(trend_coef, *args) -> Callable:
"""Helper function to simulate a polynomial trend with chosen degree."""
return np.poly1d(trend_coef)
@staticmethod
def _null_trend(*args, **kwargs) -> Callable:
"""Helper function to simulate a flat trend."""
return 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment