Skip to content

Instantly share code, notes, and snippets.

@ChadFulton
Created September 15, 2013 12:08
Show Gist options
  • Save ChadFulton/6570217 to your computer and use it in GitHub Desktop.
Save ChadFulton/6570217 to your computer and use it in GitHub Desktop.
Markov Switching Models - first pass at the Filter / Maximum Likelihood Estimation.
import numpy as np
cimport numpy as np
cimport cython
DTYPE = np.float64
ctypedef np.float64_t dtype_t
@cython.boundscheck(False)
@cython.wraparound(False)
def hamilton_filter(int nobs,
int nstates,
int order,
np.ndarray[dtype_t, ndim = 1] transition_vector not None,
np.ndarray[dtype_t, ndim = 2, mode='c'] joint_probabilities not None,
np.ndarray[dtype_t, ndim = 2, mode='c'] marginal_conditional_densities not None):
cdef np.ndarray[dtype_t, ndim = 1] joint_densities, marginal_densities
cdef np.ndarray[dtype_t, ndim = 1] _joint_probabilities, _marginal_conditional_densities
cdef int nstatesk_1, nstatesk, nstatesk1, t, i, j, k, idx
cdef dtype_t transition
nstatesk_1 = nstates**(order-1)
nstatesk = nstatesk_1*nstates
nstatesk1 = nstatesk*nstates
joint_densities = np.zeros((nstatesk1,))
marginal_densities = np.zeros((nobs,))
_joint_probabilities = np.zeros((nstatesk,))
_marginal_conditional_densities = np.zeros((nstatesk1,))
for t in range(1, nobs+1):
_joint_probabilities = joint_probabilities[t-1]
_marginal_conditional_densities = marginal_conditional_densities[t-1]
for i in range(nstates):
for j in range(nstates):
transition = transition_vector[i*nstates + j]
for k in range(nstatesk_1):
idx = j*nstatesk_1 + k
joint_densities[i*nstatesk + idx] = transition * _joint_probabilities[idx] * _marginal_conditional_densities[i*nstatesk + idx]
marginal_densities[t-1] += joint_densities[i*nstatesk + idx]
for i in range(nstatesk):
_joint_probabilities[i] = 0
idx = i*nstates
for j in range(nstates):
_joint_probabilities[i] += joint_densities[idx+j] / marginal_densities[t-1]
joint_probabilities[t] = _joint_probabilities
return marginal_densities, joint_probabilities
"""
Markov Autoregressive Models
Author: Chad Fulton
License: BSD
References
----------
Hamilton, James D. 1989.
"A New Approach to the Economic Analysis of
Nonstationary Time Series and the Business Cycle."
Econometrica 57 (2) (March 1): 357-384.
Hamilton, James D. 1994.
Time Series Analysis.
Princeton, N.J.: Princeton University Press.
Kim, Chang-Jin, and Charles R. Nelson. 1999.
"State-Space Models with Regime Switching:
Classical and Gibbs-Sampling Approaches with Applications".
MIT Press Books. The MIT Press.
"""
from __future__ import division
import numpy as np
import pandas as pd
import statsmodels.tsa.base.tsa_model as tsbase
import statsmodels.base.model as base
from statsmodels.base import data
from statsmodels.tsa.tsatools import add_constant, lagmat
from statsmodels.regression.linear_model import OLS, OLSResults
from statsmodels.tools.numdiff import approx_fprime
from statsmodels.tools.decorators import (cache_readonly, cache_writable,
resettable_cache)
import statsmodels.base.wrapper as wrap
from scipy import stats
from mar_c import hamilton_filter
class MAR(base.LikelihoodModel):
"""
"An autoregressive model of order k with first-order , M-state
Markov-switching mean and variance"
Parameters
----------
endog : array-like
The endogenous variable. Assumed not to be in deviation-from-mean form.
order : integer
The order of the autoregressive parameters.
nstates : integer
The number of states in the Markov chain.
References
----------
Kim, Chang-Jin, and Charles R. Nelson. 1999.
"State-Space Models with Regime Switching:
Classical and Gibbs-Sampling Approaches with Applications".
MIT Press Books. The MIT Press.
Notes
-----
States are zero-indexed.
"""
def __init__(self, endog, order, nstates,
dates=None, freq=None, missing='none'):
# "Immutable" properties
self.nobs_initial = order
self.nobs = endog.shape[0] - order
self.order = order
self.nstates = nstates
self.nparams = self.nstates**2 + self.order + 2*self.nstates
# Make a copy of original datasets
orig_endog = endog
orig_exog = lagmat(orig_endog, order)
# Create datasets / complete initialization
endog = orig_endog[self.nobs_initial:]
exog = orig_exog[self.nobs_initial:]
super(MAR, self).__init__(endog, exog,
hasconst=0, missing=missing)
# Overwrite originals
self.data.orig_endog = orig_endog
self.data.orig_exog = orig_exog
# Cache
self.augmented = np.c_[endog, exog]
def transform(self, params):
params = np.asarray(params)
if params.shape[0] == self.nparams: # if given all probabilities
nprobs = self.nstates**2
else: # if last row left off
nprobs = self.nstates**2 - self.nstates
# Probabilities: transform to always be in (0, 1)
#probabilities = np.exp(params[:nprobs]) / (1 + np.exp(params[:nprobs]))
probabilities = np.abs(params[:nprobs]) / (1 + np.abs(params[:nprobs]))
# AR parameters: no transformation
phi = params[nprobs:nprobs+self.order]
# Standard deviations: transform to always be positive
sigma = np.abs(params[nprobs+self.order:nprobs+self.order+self.nstates])
# Means: no transformation
means = params[-self.nstates:]
return np.r_[probabilities, phi, sigma, means]
def untransform(self, params):
params = np.asarray(params)
if params.shape[0] == self.nparams: # if given all probabilities
nprobs = self.nstates**2
else: # if last row left off
nprobs = self.nstates**2 - self.nstates
# Probabilities: untransform to always be in (-Inf, Inf)
#probabilities = np.log(params[:nprobs] / (1 - params[:nprobs]))
probabilities = params[:nprobs] / (1 - params[:nprobs])
# No other untransformations
return np.r_[probabilities, params[nprobs:]]
def loglike(self, params):
params = self.transform(params)
if params.shape[0] == self.nparams: # if given all probabilities
nprobs = self.nstates**2
else: # if last row left off
nprobs = self.nstates**2 - self.nstates
transition_vector, joint_probabilities, marginal_conditional_densities = self.initialize_filter(
params[:nprobs], # probabilities
params[nprobs:self.order+nprobs], # AR parameters
params[self.order+nprobs:self.order+nprobs+self.nstates], # sds
params[-self.nstates:] # means
)
marginal_densities, joint_probabilities = hamilton_filter(
self.nobs, self.nstates, self.order,
transition_vector, joint_probabilities,
marginal_conditional_densities
)
return np.sum(np.log(marginal_densities))
score = None
def score(self, params):
'''
Gradient of log-likelihood evaluated at params
'''
kwds = {}
kwds.setdefault('centered', True)
return approx_fprime(params, self.loglike, **kwds).ravel()
def jac(self, params, **kwds):
'''
Jacobian/Gradient of log-likelihood evaluated at params for each
observation.
'''
#kwds.setdefault('epsilon', 1e-4)
kwds.setdefault('centered', True)
return approx_fprime(params, self.loglikeobs, **kwds)
def hessian(self, params):
'''
Hessian of log-likelihood evaluated at params
'''
from statsmodels.tools.numdiff import approx_hess
# need options for hess (epsilon)
return approx_hess(params, self.loglike)
def initialize_filter(self, probabilities, params, sds, means):
"""
Calculate the joint probability of S_{t-1} = j and S_{t} = i
for j=0,1; i=0,1; and t in [1,T], given time t-1 information
Parameters
----------
probabilities : array-like
A vector of probability values for the transition matrix.
Notes about the transition matrix:
The nstates x nstates Markov chain transition matrix.
The [i,j]th element of the matrix corresponds to the
probability of moving to the i-th state given that you are
in the j-th state. This means that it is the columns that
sum to one, aka that the matrix is left stochastic.
It looks like:
| p11 p12 ... p1M |
| p21 . . |
| . . . |
| pM1 ... pMM |
The vector of probabilities is assumed to be of the form:
[p11, p21, ..., pM1, p12, p22, ..., pM2, ..., p1M, p2M, ..., pMM]
(this is an M^2 length vector)
In other words, it a vectorization of the transition matrix
obtained by "stacking" the columns. Given a transition matrix P (of
the form described above), this vectorization can be achieved by:
P.reshape((1, P.size), order='F')
or
P.reshape(-1, order='F')
or
P.ravel(order='F')
or
P.T.ravel()
etc.
If the vector provided is of length M*(M-1), it is assumed to be of
the form:
[p11, p21, ..., p(M-1)1, p12, p22, ..., p(M-1)2, ..., p1M, p2M, ..., p(M-1)M]
i.e. it is assumed that the last row was omitted, and its elements
should be calculated as:
PM* = 1 - p1* - p2* - ... - p(M-1)*
Returns
-------
transition_vector : array-like
A column "stacked" vectorization of the transition matrix.
joint_probabilities : array-like
A nobs+1 x nstates**k matrix, filled with zeros except for the
first row, which is initialized using the unconditional
probabilities drawn from the transition matrix.
marginal_conditional_densities : array-like
A nobs x nstates**(order+1) matrix, giving the marginal density
of y_t conditional on lagged values of y_t and on the k+1 vector of
states.
"""
# We can cache the calculation of initial joint probabilities
nrows = probabilities.size / self.nstates
transition_matrix = probabilities.reshape(
(nrows, self.nstates),
order='F'
)
# Handle the case where the last row was left off
if nrows != self.nstates:
transition_matrix = np.c_[
transition_matrix.T, 1-transition_matrix.sum(0)
].T
# Unconditional probabilities of the states, given a transition matrix
A = np.r_[
np.eye(self.nstates) - transition_matrix,
np.ones((self.nstates,1)).T
]
pi = np.linalg.inv(A.T.dot(A)).dot(A.T)[:,-1]
# Joint probabilities (of states): (nobs+1) x (M x ... x M), ndim = k+1
# It's time dimension is nobs+1 because the 0th joint probability is
# the input (calculated from the unconditional probabilities) for the
# first iteration of the algorithm, which starts at time t=1
joint_probabilities = np.zeros((self.nobs+1, self.nstates**self.order))
# The initialized values for the joint probabilities of each length k
# sequence of states are calculated from the unconditional probabilities
# Note: considering k states
# The order of the state squences is lexicographic, increasing
transition_vector = transition_matrix.reshape(
(1, transition_matrix.size)
).squeeze()
conditional_probabilities = [
np.tile(
transition_vector.repeat(self.nstates**(self.order-2-i)),
self.nstates**i
)[:,None] # need to add the second dimension to concatenate
for i in range(self.order-1) # k-1 values; 0=first, k-2=last
]
unconditional_probabilities = np.tile(
pi, self.nstates**(self.order-1)
)[:,None]
joint_probabilities[0] = reduce(np.multiply,
conditional_probabilities + [unconditional_probabilities]
).squeeze()
# Setup to calculate the marginal conditional densities
params = np.r_[1, -params]
# The order of the states is lexicographic, increasing
x = self.augmented.dot(params)
loc = np.dot(
np.concatenate([
np.tile(
means.repeat(self.nstates**(self.order-i)),
self.nstates**i
)[:, None]
for i in range(self.order+1) # k+1 values; 0=first, k=last
], 1),
params
)
scale = np.asarray(sds).repeat(self.nstates**(self.order))
# Compute the marginal conditional densities.
var = np.tile(scale**2, self.nobs)
marginal_conditional_densities = (
(1 / np.sqrt(2*np.pi*var)) * np.exp(
-( (np.repeat(x, self.nstates**(self.order+1)) - np.tile(loc, self.nobs))**2 ) / (2*var)
)
).reshape((self.nobs, self.nstates**(self.order+1)))
return (transition_vector,
joint_probabilities,
marginal_conditional_densities)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment