Skip to content

Instantly share code, notes, and snippets.

@mjhajharia
Created August 20, 2021 23:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mjhajharia/fd5e6dfe647474d692ddea86a1bd7dab to your computer and use it in GitHub Desktop.
Save mjhajharia/fd5e6dfe647474d692ddea86a1bd7dab to your computer and use it in GitHub Desktop.
# import aesara.tensor as at
import numpy as np
import pandas as pd
# from aesara import scan
# from scipy import stats
#
import pymc3 as pm
from pymc3.distributions import distribution
from pymc3.distributions.continuous import (
Beta,
Cauchy,
ChiSquared,
Exponential,
Gamma,
InverseGamma,
Laplace,
Normal,
StudentT,
Uniform,
)
class SARIMA(distribution.Continuous):
"""
.. math::
SARIMA_{(p, d, q) \times (P, D, Q)_s} can be represented as
\\phi_p (L) \tilde \\phi_P (L^s) \\Delta^d \\Delta_s^D y_t = A(t) +
\theta_q (L) \tilde \theta_Q (L^s) \\zeta_t
.. math::
"""
def __init__(
self, order, seasonal, ts=None, period=None, xreg=None, name=None, *args, **kwargs
):
super().__init__()
self.sigma0 = None
self.mu0 = None
if ts is not None:
self.y = np.array(ts).astype(float)
self.yreal = pd.TimeSeries(ts)
self.n = len(self.y)
# series name
if name is not None:
sn = str(name)
else:
ts = dict()
sn = f"{ts=}".split("=")[0]
dimension = 1
# order
if order[1] >= 0:
self.p = order[1]
if order[2] >= 0:
self.q = order[2]
if order[3] >= 0:
self.d = order[3]
# Period
if period == 0:
self.period = self.yreal.freq
else:
self.period = period
# Seasonal Order
self.sp = seasonal[1]
self.sq = seasonal[2]
self.dd = seasonal[3]
if self.period <= 1:
self.pp = 0
self.sq = 0
self.dd = 0
self.phi = np.array(self.p) # ar parameters
self.phi0 = np.linspace(self.p, -1, 1)
self.sphi = np.array(self.sp) # sar parameters
self.sphi0 = np.linspace(self.sp, -1, 1)
self.theta = np.array(self.q) # ma parameters
self.theta0 = np.linspace(self.q, -1, 1)
self.stheta = np.array(self.sq) # sma parameters
self.stheta0 = np.linspace(self.sq, -1, 1)
self.mu = np.array(self.n1) # Mean pparameter
self.epsilon = np.array(self.n1) # residual parameter
self.dinits = self.d + (self.period * self.dd)
self.n1 = self.n
if self.xreg is not None:
assert (
all(isinstance(ele, list) for ele in self.xreg) == True
), "xreg has to be a matrix with row dimension as same as the length of the time series"
assert (
self.xreg.shape[0] == self.n
), "The length of xreg don't match with the length of the time series"
assert self.dd == 0, "seasonal difference is not allowed in dynamic regressions D = 0 "
self.dd = 0
self.d1 = self.xreg.shape[1]
self.breg = np.array(self.d1)
if self.d > 0:
self.xreg = np.diff(self.xreg, n=self.d)
self.xlast = np.array((self.d, self.d1))
self.xlast[
1,
] = self.xreg[-1]
if self.d > 1:
for i in range(2, self.d):
self.xlast[i,] = np.diff(
self.xreg, n=i - 1
)[-1]
else:
self.d1 = 0
self.n2 = self.n1 - self.d - (self.period * self.dd)
self.xreg = np.tile([0, self.d1 * self.n2], (self.d1, self.n2))
# Default Priors
self.prior_sigma0 = StudentT.dist(mu=0, sigma=1, nu=7)
self.prior_mu0 = StudentT.dist(mu=0, sigma=2.5, nu=6)
self.prior_ar = Normal.dist(mu=0, sigma=0.5)
self.prior_ma = Normal.dist(mu=0, sigma=0.5)
self.prior_sar = Normal.dist(mu=0, sigma=0.5)
self.prior_sma = Normal.dist(mu=0, sigma=0.5)
self.prior_breg = StudentT.dist(mu=0, sigma=2.5, nu=6)
def transformation_coefficients(self, number, hyperparameter, parameter, parameter0):
for i in range(number):
if hyperparameter[i, 4] == 1:
parameter[i] = parameter0[i]
else:
parameter[i] = 2 * abs(parameter0[i]) - 1
return parameter
def transform(self):
self.phi = self.transformation_coefficients(self.p, self.phi, self.phi0, self.prior_ar)
self.theta = self.transformation_coefficients(
self.q, self.theta, self.theta0, self.prior_ma
)
self.sphi = self.transformation_coefficients(self.sp, self.sphi, self.sphi0, self.prior_sar)
self.stheta = self.transformation_coefficients(
self.sq, self.stheta, self.stheta0, self.prior_sma
)
def arma_estimation(self):
if self.d1 > 0:
mu = self.xreg * self.breg
else:
mu = np.zeros(self.n)
for i in range(self.n):
mu[i] += self.mu0
for j in range(self.p):
if i > j:
mu[i] += self.y[i - j] * self.phi[j]
for j in range(self.q):
if i > j:
mu[i] += self.epsilon[i - j] * self.theta[j]
for j in range(self.sp):
if i > (self.period * j):
mu[i] += self.y[i - (self.period * j)] * self.sphi[j]
for j in range(self.sq):
if i > (self.period * j):
mu[i] += self.epsilon[i - (self.period * j)] * self.stheta[j]
def invchi(self, x, chisquare):
return (1.0 / x ** 2) * chisquare.logp(1 / x)
def priorint(self, prior, x, target, n, dist):
if prior == n:
target += dist.logp(x)
def target_hyperparam(self, target, prior_x0, prior_x01, prior_x02, prior_x03, x0, three=False):
# prior mu
self.priorint(prior_x0, x0, target, 1, Normal.dist(mu=prior_x01, sigma=prior_x02))
self.priorint(prior_x0, x0, target, 2, Beta.dist(alpha=prior_x01, beta=prior_x02))
self.priorint(prior_x0, x0, target, 3, Uniform.dist(lower=prior_x01, upper=prior_x02))
if three:
return target
else:
self.priorint(
prior_x0,
x0,
target,
4,
StudentT.dist(mu=prior_x01, sigma=prior_x02, nu=prior_x03),
)
self.priorint(prior_x0, x0, target, 5, Cauchy.dist(alpha=prior_x01, beta=prior_x02))
self.priorint(
prior_x0, x0, target, 7, InverseGamma.dist(alpha=prior_x01, beta=prior_x02)
)
if prior_x0 == 7:
target += self.invchi(
x0, pm.ChiSquared("target", nu=prior_x03)
) # inverse chi square
if prior_x0 == 8:
target += -np.log(self.sigma0)
self.priorint(prior_x0, x0, target, 9, Gamma.dist(alpha=prior_x01, beta=prior_x03))
self.priorint(prior_x0, x0, target, 10, Exponential.dist(lam=prior_x02))
self.priorint(prior_x0, x0, target, 11, ChiSquared.dist(nu=prior_x03))
self.priorint(prior_x0, x0, target, 12, Laplace.dist(mu=prior_x01, b=prior_x02))
return target
def prior_for_sigma0(self, target, prior_sigma0, sigma0):
self.target_hyperparam(
target,
prior_sigma0[4],
prior_sigma0[1],
prior_sigma0[2],
prior_sigma0[3],
sigma0,
three=False,
)
return target
def prior_for_mu0(self, target, prior_mu0, mu0):
self.target_hyperparam(
target, prior_mu0[4], prior_mu0[1], prior_mu0[2], prior_mu0[3], mu0, three=False
)
return target
def prior_for_breg(self, target, prior_breg, breg):
if self.d1 > 0:
for i in range(self.d1):
self.target_hyperparam(
target,
prior_breg[i, 4],
prior_breg[i, 1],
prior_breg[i, 2],
prior_breg[i, 3],
breg[i],
three=True,
)
return target
def ar_ma_priors(self, x, target, prior_ar_ma, ar_ma):
if x > 0:
for i in range(x):
self.target_hyperparam(
target,
prior_ar_ma[i, 4],
prior_ar_ma[i, 1],
prior_ar_ma[i, 2],
ar_ma[i],
three=True,
)
def prior_ar(self, p, target, prior_ar, phi0):
return self.ar_ma_priors(p, target, prior_ar, phi0)
def prior_ma(self, q, target, prior_ma, theta0):
return self.ar_ma_priors(q, target, prior_ma, theta0)
def prior_sar(self, sp, target, prior_sar, sphi0):
return self.ar_ma_priors(sp, target, prior_sar, sphi0)
def prior_sma(self, sq, target, prior_sma, stheta0):
return self.ar_ma_priors(sq, target, prior_sma, stheta0)
def get_order_arima(self):
return dict(
{
"p": self.p,
"d": self.d,
"q": self.q,
"sp": self.sp,
"dd": self.dd,
"sq": self.sq,
"d1": self.d1,
"period": self.period,
}
)
def max_order_arima(self):
return max(self.p, self.q, self.period, self.sp, self.period * self.sq)
def likelihood(self, target):
target += Normal("target", mu=0, sigma=self.sigma0).logp(self.epsilon)
def generated_quantities(self):
loglik = 0
log_lik = np.array(self.n1)
fit = np.array(self.n)
residuals = np.array(self.n)
for i in range(self.n):
if i <= self.dinits:
residuals[i] = Normal.dist(mu=0, sigma=self.sigma0)
else:
residuals[i] = Normal.dist(mu=self.epsilon[i - self.dinits], sigma=self.sigma0)
fit[i] = self.yreal[i] - residuals[i]
if i <= self.n1:
log_lik[i] = Normal.dist(mu=self.mu[i], sigma=self.sigma0).logp(self.y[i])
loglik += log_lik[i]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment