Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
A Python based implementation of the Poisson Hidden Markov Model and a tutorial on how to build and train it on the US manufacturing strikes data set.
import math
import numpy as np
import statsmodels.api as sm
from statsmodels.base.model import GenericLikelihoodModel
from scipy.stats import poisson
from patsy import dmatrices
import as tsa
from matplotlib import pyplot as plt
from import approx_hess1, approx_hess2, approx_hess3
#Create a class that extends the GenericLikelihoodModel class so that we can train the model
# using a custom log-likelihood function
class PoissonHMM(GenericLikelihoodModel):
def __init__(self, endog, exog, k_regimes=2, loglike=None, score=None, hessian=None,
missing='none', extra_params_names=None, **kwds):
super(PoissonHMM, self).__init__(endog=endog, exog=exog, loglike=loglike, score=score,
hessian=hessian, missing=missing,
extra_params_names=extra_params_names, kwds=kwds)
self.y = np.array(self.endog)
self.k_regimes = k_regimes
#k_regimes x exog.shape[1] size matrix of regime specific regression coefficients
self.beta_matrix = np.ones([self.k_regimes, self.exog.shape[1]])
# k x k matrix of psuedo transition probabilities which can range from -inf to +inf during
# optimization. Initialized to 1.0/k
self.q_matrix = np.ones([self.k_regimes,self.k_regimes])*(1.0/self.k_regimes)
#The regime wise matrix of Poisson means. These would be updated during the optimization
# loop
self.mu_matrix = []
# k x k matrix of the real Markov transition probabilities which will be calculated from
# the q-matrix using a standardization technique. Initialized to 1.0/k
self.gamma_matrix = np.ones([self.k_regimes, self.k_regimes])*(1.0/self.k_regimes)
# The Markov state probabilities. Also referred to as pi. but we'll use delta since pi is
# often used to refer to the mean
self.delta_matrix = np.ones([self.exog.shape[0],self.k_regimes])*(1.0/self.k_regimes)
#The vector of initial values for all the parameters, beta and q, that the optimizer will
# optimize
self.start_params = np.repeat(np.ones(self.exog.shape[1]), repeats=self.k_regimes)
self.start_params = np.append(self.start_params, self.q_matrix.flatten())
#A very tiny number (machine specific). Used by the LL function.
self.EPS = np.MachAr().eps
#Optimization iteration counter
#This method is called by the optimizer once in each iteration to get the current value of
# the log-likelihood corresponding to the current values of all the parameters.
def nloglikeobs(self, params):
#Reconstitute the q and beta matrices from the current values of all the params
#Build the regime wise matrix of Poisson means
#Build the matrix of Markov transition probabilities by standardizing all the q values to
# the 0 to 1 range
#Build the (len(y) x k) matrix delta of Markov state probabilities distribution. k state
# probabilities corresponding to k regimes, times, number of time steps (i.e. observations)
#Let's increment the iteration count
# Compute all the log-likelihood values for the Poisson Markov model
ll = self.compute_loglikelihood()
#Print out the iteration summary
print('ITER='+str(self.iter_num) + ' ll='+str(((-ll).sum(0))))
#Return the negated array of log-likelihood values
return -ll
# Reconstitute the q and beta matrices from the current values of all the params
def reconstitute_parameter_matrices(self, params):
self.q_matrix = params[-(self.k_regimes**2):]
self.q_matrix = self.q_matrix.reshape([self.k_regimes, self.k_regimes])
self.beta_matrix = params[:-(self.k_regimes**2)]
self.beta_matrix = self.beta_matrix.reshape([self.k_regimes, self.exog.shape[1]])
# Build the regime wise matrix of Poisson means
def compute_regime_specific_poisson_means(self):
self.mu_matrix = []
for j in range(self.k_regimes):
#Fetch the regression coefficients vector corresponding to the jth regime
beta_j = self.beta_matrix[j]
#Compute the Poisson mean mu as a dot product of X and Beta
mu_j = np.exp(
if len(self.mu_matrix) == 0:
self.mu_matrix = mu_j
self.mu_matrix = np.vstack((self.mu_matrix,mu_j))
self.mu_matrix = self.mu_matrix.transpose()
# Build the matrix of Markov transition probabilities by standardizing all the q values to
# the 0 to 1 range
def compute_markov_transition_probabilities(self):
for i in range(self.k_regimes):
denom = 1
for r in range(self.k_regimes-1):
denom += math.exp(-self.q_matrix[i][r])
for j in range(self.k_regimes-1):
self.gamma_matrix[i][j] = math.exp(-self.q_matrix[i][j])/denom
self.gamma_matrix[i][self.k_regimes-1] = 1.0/denom
# Build the (len(y) x k) matrix delta of Markov state probabilities distribution. k state
# probabilities corresponding to k regimes, times, number of time steps (i.e. observations)
def compute_markov_state_probabilities(self):
for t in range(1,len(self.y)):
self.delta_matrix[t] = np.matmul(self.delta_matrix[t-1], self.gamma_matrix)
# Compute all the log-likelihood values for the Poisson Markov model
def compute_loglikelihood(self):
#Init the list of loglikelihhod values, one value for each y observation
ll = []
for t in range(len(self.y)):
prob_y_t = 0
mu_t = 0
for j in range(self.k_regimes):
#To use the law of total probability, uncomment this row and comment out the next
# two rows
#prob_y_t += poisson.pmf(y[t], mu[t][j]) * self.delta_matrix[t][j]
#Calculate the Poisson mean mu_t as an expectation over all Markov state
# probabilities
mu_t += self.mu_matrix[t][j] * self.delta_matrix[t][j]
prob_y_t += poisson.pmf(self.y[t], mu_t)
#This is a bit of a kludge. If the likelihood turns out to be real tiny, fix it to
# the EPS value for the machine
if prob_y_t < self.EPS:
prob_y_t = self.EPS
#Push the LL into the list of LLs
ll = np.array(ll)
return ll
#This function just tries its best to compute an invertible Hessian so that the standard
# errors and confidence intervals of all the trained parameters can be computed successfully.
def hessian(self, params):
for approx_hess_func in [approx_hess3, approx_hess2, approx_hess1]:
H = approx_hess_func(x=params, f=self.loglike, epsilon=self.EPS)
if np.linalg.cond(H) < 1 / self.EPS:
print('Found invertible hessian using '+str(approx_hess_func))
return H
print('DID NOT find invertible hessian')
H[H == 0.0] = self.EPS
return H
#Download the manufacturing strikes data set from R datasets
strikes_dataset = sm.datasets.get_rdataset(dataname='StrikeNb', package='Ecdat')
#Get it as a Pandas DataFrame
strikes_data =
#Print out the data set
#Plot the number of strikes starting each month
plt.xlabel('Month index')
plt.ylabel('Number of strikes beginning each month')
#Plot the change in manufacturing activity (from trend line) in each month
fig = plt.figure()
fig.suptitle('Change in US manufacturing activity (departure from trend line)')
plt.xlabel('Month index')
#Plot the auto-correlation plot of the dependent variable 'strikes'
tsa.plot_acf(strikes_data['strikes'], alpha=0.05)
#Plot the partial auto-correlation plot of the dependent variable 'strikes'
tsa.plot_acf(strikes_data['strikes'], alpha=0.05)
#Since there is a strong correlation at lag-1, add the lag-1 copy of strikes
# as a regression variable. Actually, we will add ln(strikes_lag1) to avoid
# 'model explosion' when the coefficient is positive
strikes_data['strikes_lag1'] = strikes_data['strikes'].shift(1)
#Drop rows with empty cells
strikes_data = strikes_data.dropna()
#Create the indicator function for calculating the value of the indicator
# variable d1 as follows: if strikes == 0, d1 = 1, else d1 = 0
def indicator_func(x):
if x == 0:
return 1
return 0
#Add the column for d1
strikes_data['d1'] = strikes_data['strikes_lag1'].apply(indicator_func)
#Adjust the lagged strikes variable so that it is set to 1, when its value is 0
strikes_data['strikes_adj_lag1'] = np.maximum(1, strikes_data['strikes_lag1'])
#Add the natural log of strikes_lag1 as a regression variable
strikes_data['ln_strikes_adj_lag1'] = np.log(strikes_data['strikes_adj_lag1'])
#Form the regression expression. The placeholder for the intercept column will
# be added automatically by Patsy
expr = 'strikes ~ output + ln_strikes_adj_lag1 + d1'
#Use Patsy to carve out the y and X matrices
y_train, X_train = dmatrices(expr, strikes_data, return_type='dataframe')
#Let's look at how our X and y matrices have turned out
#We'll experiment with a 2-state HMM with the consequent assumption that the data cycles through
# 2 distinct regimes, each one of which influences the mean of the Poisson process
k_regimes = 2
#There will len(X_train.columns) number of regression coefficients per regime to be sent into
# the model for optimization. So len(X_train.columns) * k_regimes beta coefficients in all to
# be optimized.
extra_params_names = []
#The columns corresponding to one regime are already baked into X_train in the form of the
# regression parameters
for regime_num in range(1, k_regimes):
for param_name in X_train.columns:
#The model will also optimize the k x k matrix of pseudo transition probabilities. So send those
# too into the extra_params vector
for i in range(k_regimes):
for j in range(k_regimes):
#Create an instance of the PoissonHMM model class
poisson_hmm = PoissonHMM(y_train, X_train, k_regimes=k_regimes,
#Train the model
poisson_hmm_results ='bfgs', maxiter=1000)
#Print the fitted params
print('fitted params='+str(poisson_hmm_results.params))
#Also print out the Markov transition probabilities P:
#Print the model training summary
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment