Created
September 15, 2013 12:08
-
-
Save ChadFulton/6570217 to your computer and use it in GitHub Desktop.
Markov Switching Models - first pass at the Filter / Maximum Likelihood Estimation.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
cimport numpy as np | |
cimport cython | |
DTYPE = np.float64 | |
ctypedef np.float64_t dtype_t | |
@cython.boundscheck(False) | |
@cython.wraparound(False) | |
def hamilton_filter(int nobs, | |
int nstates, | |
int order, | |
np.ndarray[dtype_t, ndim = 1] transition_vector not None, | |
np.ndarray[dtype_t, ndim = 2, mode='c'] joint_probabilities not None, | |
np.ndarray[dtype_t, ndim = 2, mode='c'] marginal_conditional_densities not None): | |
cdef np.ndarray[dtype_t, ndim = 1] joint_densities, marginal_densities | |
cdef np.ndarray[dtype_t, ndim = 1] _joint_probabilities, _marginal_conditional_densities | |
cdef int nstatesk_1, nstatesk, nstatesk1, t, i, j, k, idx | |
cdef dtype_t transition | |
nstatesk_1 = nstates**(order-1) | |
nstatesk = nstatesk_1*nstates | |
nstatesk1 = nstatesk*nstates | |
joint_densities = np.zeros((nstatesk1,)) | |
marginal_densities = np.zeros((nobs,)) | |
_joint_probabilities = np.zeros((nstatesk,)) | |
_marginal_conditional_densities = np.zeros((nstatesk1,)) | |
for t in range(1, nobs+1): | |
_joint_probabilities = joint_probabilities[t-1] | |
_marginal_conditional_densities = marginal_conditional_densities[t-1] | |
for i in range(nstates): | |
for j in range(nstates): | |
transition = transition_vector[i*nstates + j] | |
for k in range(nstatesk_1): | |
idx = j*nstatesk_1 + k | |
joint_densities[i*nstatesk + idx] = transition * _joint_probabilities[idx] * _marginal_conditional_densities[i*nstatesk + idx] | |
marginal_densities[t-1] += joint_densities[i*nstatesk + idx] | |
for i in range(nstatesk): | |
_joint_probabilities[i] = 0 | |
idx = i*nstates | |
for j in range(nstates): | |
_joint_probabilities[i] += joint_densities[idx+j] / marginal_densities[t-1] | |
joint_probabilities[t] = _joint_probabilities | |
return marginal_densities, joint_probabilities |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Markov Autoregressive Models | |
Author: Chad Fulton | |
License: BSD | |
References | |
---------- | |
Hamilton, James D. 1989. | |
"A New Approach to the Economic Analysis of | |
Nonstationary Time Series and the Business Cycle." | |
Econometrica 57 (2) (March 1): 357-384. | |
Hamilton, James D. 1994. | |
Time Series Analysis. | |
Princeton, N.J.: Princeton University Press. | |
Kim, Chang-Jin, and Charles R. Nelson. 1999. | |
"State-Space Models with Regime Switching: | |
Classical and Gibbs-Sampling Approaches with Applications". | |
MIT Press Books. The MIT Press. | |
""" | |
from __future__ import division | |
import numpy as np | |
import pandas as pd | |
import statsmodels.tsa.base.tsa_model as tsbase | |
import statsmodels.base.model as base | |
from statsmodels.base import data | |
from statsmodels.tsa.tsatools import add_constant, lagmat | |
from statsmodels.regression.linear_model import OLS, OLSResults | |
from statsmodels.tools.numdiff import approx_fprime | |
from statsmodels.tools.decorators import (cache_readonly, cache_writable, | |
resettable_cache) | |
import statsmodels.base.wrapper as wrap | |
from scipy import stats | |
from mar_c import hamilton_filter | |
class MAR(base.LikelihoodModel): | |
""" | |
"An autoregressive model of order k with first-order , M-state | |
Markov-switching mean and variance" | |
Parameters | |
---------- | |
endog : array-like | |
The endogenous variable. Assumed not to be in deviation-from-mean form. | |
order : integer | |
The order of the autoregressive parameters. | |
nstates : integer | |
The number of states in the Markov chain. | |
References | |
---------- | |
Kim, Chang-Jin, and Charles R. Nelson. 1999. | |
"State-Space Models with Regime Switching: | |
Classical and Gibbs-Sampling Approaches with Applications". | |
MIT Press Books. The MIT Press. | |
Notes | |
----- | |
States are zero-indexed. | |
""" | |
def __init__(self, endog, order, nstates, | |
dates=None, freq=None, missing='none'): | |
# "Immutable" properties | |
self.nobs_initial = order | |
self.nobs = endog.shape[0] - order | |
self.order = order | |
self.nstates = nstates | |
self.nparams = self.nstates**2 + self.order + 2*self.nstates | |
# Make a copy of original datasets | |
orig_endog = endog | |
orig_exog = lagmat(orig_endog, order) | |
# Create datasets / complete initialization | |
endog = orig_endog[self.nobs_initial:] | |
exog = orig_exog[self.nobs_initial:] | |
super(MAR, self).__init__(endog, exog, | |
hasconst=0, missing=missing) | |
# Overwrite originals | |
self.data.orig_endog = orig_endog | |
self.data.orig_exog = orig_exog | |
# Cache | |
self.augmented = np.c_[endog, exog] | |
def transform(self, params): | |
params = np.asarray(params) | |
if params.shape[0] == self.nparams: # if given all probabilities | |
nprobs = self.nstates**2 | |
else: # if last row left off | |
nprobs = self.nstates**2 - self.nstates | |
# Probabilities: transform to always be in (0, 1) | |
#probabilities = np.exp(params[:nprobs]) / (1 + np.exp(params[:nprobs])) | |
probabilities = np.abs(params[:nprobs]) / (1 + np.abs(params[:nprobs])) | |
# AR parameters: no transformation | |
phi = params[nprobs:nprobs+self.order] | |
# Standard deviations: transform to always be positive | |
sigma = np.abs(params[nprobs+self.order:nprobs+self.order+self.nstates]) | |
# Means: no transformation | |
means = params[-self.nstates:] | |
return np.r_[probabilities, phi, sigma, means] | |
def untransform(self, params): | |
params = np.asarray(params) | |
if params.shape[0] == self.nparams: # if given all probabilities | |
nprobs = self.nstates**2 | |
else: # if last row left off | |
nprobs = self.nstates**2 - self.nstates | |
# Probabilities: untransform to always be in (-Inf, Inf) | |
#probabilities = np.log(params[:nprobs] / (1 - params[:nprobs])) | |
probabilities = params[:nprobs] / (1 - params[:nprobs]) | |
# No other untransformations | |
return np.r_[probabilities, params[nprobs:]] | |
def loglike(self, params): | |
params = self.transform(params) | |
if params.shape[0] == self.nparams: # if given all probabilities | |
nprobs = self.nstates**2 | |
else: # if last row left off | |
nprobs = self.nstates**2 - self.nstates | |
transition_vector, joint_probabilities, marginal_conditional_densities = self.initialize_filter( | |
params[:nprobs], # probabilities | |
params[nprobs:self.order+nprobs], # AR parameters | |
params[self.order+nprobs:self.order+nprobs+self.nstates], # sds | |
params[-self.nstates:] # means | |
) | |
marginal_densities, joint_probabilities = hamilton_filter( | |
self.nobs, self.nstates, self.order, | |
transition_vector, joint_probabilities, | |
marginal_conditional_densities | |
) | |
return np.sum(np.log(marginal_densities)) | |
score = None | |
def score(self, params): | |
''' | |
Gradient of log-likelihood evaluated at params | |
''' | |
kwds = {} | |
kwds.setdefault('centered', True) | |
return approx_fprime(params, self.loglike, **kwds).ravel() | |
def jac(self, params, **kwds): | |
''' | |
Jacobian/Gradient of log-likelihood evaluated at params for each | |
observation. | |
''' | |
#kwds.setdefault('epsilon', 1e-4) | |
kwds.setdefault('centered', True) | |
return approx_fprime(params, self.loglikeobs, **kwds) | |
def hessian(self, params): | |
''' | |
Hessian of log-likelihood evaluated at params | |
''' | |
from statsmodels.tools.numdiff import approx_hess | |
# need options for hess (epsilon) | |
return approx_hess(params, self.loglike) | |
def initialize_filter(self, probabilities, params, sds, means): | |
""" | |
Calculate the joint probability of S_{t-1} = j and S_{t} = i | |
for j=0,1; i=0,1; and t in [1,T], given time t-1 information | |
Parameters | |
---------- | |
probabilities : array-like | |
A vector of probability values for the transition matrix. | |
Notes about the transition matrix: | |
The nstates x nstates Markov chain transition matrix. | |
The [i,j]th element of the matrix corresponds to the | |
probability of moving to the i-th state given that you are | |
in the j-th state. This means that it is the columns that | |
sum to one, aka that the matrix is left stochastic. | |
It looks like: | |
| p11 p12 ... p1M | | |
| p21 . . | | |
| . . . | | |
| pM1 ... pMM | | |
The vector of probabilities is assumed to be of the form: | |
[p11, p21, ..., pM1, p12, p22, ..., pM2, ..., p1M, p2M, ..., pMM] | |
(this is an M^2 length vector) | |
In other words, it a vectorization of the transition matrix | |
obtained by "stacking" the columns. Given a transition matrix P (of | |
the form described above), this vectorization can be achieved by: | |
P.reshape((1, P.size), order='F') | |
or | |
P.reshape(-1, order='F') | |
or | |
P.ravel(order='F') | |
or | |
P.T.ravel() | |
etc. | |
If the vector provided is of length M*(M-1), it is assumed to be of | |
the form: | |
[p11, p21, ..., p(M-1)1, p12, p22, ..., p(M-1)2, ..., p1M, p2M, ..., p(M-1)M] | |
i.e. it is assumed that the last row was omitted, and its elements | |
should be calculated as: | |
PM* = 1 - p1* - p2* - ... - p(M-1)* | |
Returns | |
------- | |
transition_vector : array-like | |
A column "stacked" vectorization of the transition matrix. | |
joint_probabilities : array-like | |
A nobs+1 x nstates**k matrix, filled with zeros except for the | |
first row, which is initialized using the unconditional | |
probabilities drawn from the transition matrix. | |
marginal_conditional_densities : array-like | |
A nobs x nstates**(order+1) matrix, giving the marginal density | |
of y_t conditional on lagged values of y_t and on the k+1 vector of | |
states. | |
""" | |
# We can cache the calculation of initial joint probabilities | |
nrows = probabilities.size / self.nstates | |
transition_matrix = probabilities.reshape( | |
(nrows, self.nstates), | |
order='F' | |
) | |
# Handle the case where the last row was left off | |
if nrows != self.nstates: | |
transition_matrix = np.c_[ | |
transition_matrix.T, 1-transition_matrix.sum(0) | |
].T | |
# Unconditional probabilities of the states, given a transition matrix | |
A = np.r_[ | |
np.eye(self.nstates) - transition_matrix, | |
np.ones((self.nstates,1)).T | |
] | |
pi = np.linalg.inv(A.T.dot(A)).dot(A.T)[:,-1] | |
# Joint probabilities (of states): (nobs+1) x (M x ... x M), ndim = k+1 | |
# It's time dimension is nobs+1 because the 0th joint probability is | |
# the input (calculated from the unconditional probabilities) for the | |
# first iteration of the algorithm, which starts at time t=1 | |
joint_probabilities = np.zeros((self.nobs+1, self.nstates**self.order)) | |
# The initialized values for the joint probabilities of each length k | |
# sequence of states are calculated from the unconditional probabilities | |
# Note: considering k states | |
# The order of the state squences is lexicographic, increasing | |
transition_vector = transition_matrix.reshape( | |
(1, transition_matrix.size) | |
).squeeze() | |
conditional_probabilities = [ | |
np.tile( | |
transition_vector.repeat(self.nstates**(self.order-2-i)), | |
self.nstates**i | |
)[:,None] # need to add the second dimension to concatenate | |
for i in range(self.order-1) # k-1 values; 0=first, k-2=last | |
] | |
unconditional_probabilities = np.tile( | |
pi, self.nstates**(self.order-1) | |
)[:,None] | |
joint_probabilities[0] = reduce(np.multiply, | |
conditional_probabilities + [unconditional_probabilities] | |
).squeeze() | |
# Setup to calculate the marginal conditional densities | |
params = np.r_[1, -params] | |
# The order of the states is lexicographic, increasing | |
x = self.augmented.dot(params) | |
loc = np.dot( | |
np.concatenate([ | |
np.tile( | |
means.repeat(self.nstates**(self.order-i)), | |
self.nstates**i | |
)[:, None] | |
for i in range(self.order+1) # k+1 values; 0=first, k=last | |
], 1), | |
params | |
) | |
scale = np.asarray(sds).repeat(self.nstates**(self.order)) | |
# Compute the marginal conditional densities. | |
var = np.tile(scale**2, self.nobs) | |
marginal_conditional_densities = ( | |
(1 / np.sqrt(2*np.pi*var)) * np.exp( | |
-( (np.repeat(x, self.nstates**(self.order+1)) - np.tile(loc, self.nobs))**2 ) / (2*var) | |
) | |
).reshape((self.nobs, self.nstates**(self.order+1))) | |
return (transition_vector, | |
joint_probabilities, | |
marginal_conditional_densities) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment