Instantly share code, notes, and snippets.
Created
November 26, 2021 14:23
A Python based implementation of the Poisson Hidden Markov Model and a tutorial on how to build and train it on the US manufacturing strikes data set.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import numpy as np | |
import statsmodels.api as sm | |
from statsmodels.base.model import GenericLikelihoodModel | |
from scipy.stats import poisson | |
from patsy import dmatrices | |
import statsmodels.graphics.tsaplots as tsa | |
from matplotlib import pyplot as plt | |
from statsmodels.tools.numdiff import approx_hess1, approx_hess2, approx_hess3 | |
#Create a class that extends the GenericLikelihoodModel class so that we can train the model | |
# using a custom log-likelihood function | |
class PoissonHMM(GenericLikelihoodModel): | |
def __init__(self, endog, exog, k_regimes=2, loglike=None, score=None, hessian=None, | |
missing='none', extra_params_names=None, **kwds): | |
super(PoissonHMM, self).__init__(endog=endog, exog=exog, loglike=loglike, score=score, | |
hessian=hessian, missing=missing, | |
extra_params_names=extra_params_names, kwds=kwds) | |
self.y = np.array(self.endog) | |
self.k_regimes = k_regimes | |
#k_regimes x exog.shape[1] size matrix of regime specific regression coefficients | |
self.beta_matrix = np.ones([self.k_regimes, self.exog.shape[1]]) | |
# k x k matrix of psuedo transition probabilities which can range from -inf to +inf during | |
# optimization. Initialized to 1.0/k | |
self.q_matrix = np.ones([self.k_regimes,self.k_regimes])*(1.0/self.k_regimes) | |
print('self.q_matrix='+str(self.q_matrix)) | |
#The regime wise matrix of Poisson means. These would be updated during the optimization | |
# loop | |
self.mu_matrix = [] | |
# k x k matrix of the real Markov transition probabilities which will be calculated from | |
# the q-matrix using a standardization technique. Initialized to 1.0/k | |
self.gamma_matrix = np.ones([self.k_regimes, self.k_regimes])*(1.0/self.k_regimes) | |
print('self.gamma_matrix='+str(self.gamma_matrix)) | |
# The Markov state probabilities. Also referred to as pi. but we'll use delta since pi is | |
# often used to refer to the mean | |
self.delta_matrix = np.ones([self.exog.shape[0],self.k_regimes])*(1.0/self.k_regimes) | |
print('self.delta_matrix='+str(self.delta_matrix)) | |
#The vector of initial values for all the parameters, beta and q, that the optimizer will | |
# optimize | |
self.start_params = np.repeat(np.ones(self.exog.shape[1]), repeats=self.k_regimes) | |
self.start_params = np.append(self.start_params, self.q_matrix.flatten()) | |
print('self.start_params='+str(self.start_params)) | |
#A very tiny number (machine specific). Used by the LL function. | |
self.EPS = np.MachAr().eps | |
#Optimization iteration counter | |
self.iter_num=0 | |
#This method is called by the optimizer once in each iteration to get the current value of | |
# the log-likelihood corresponding to the current values of all the parameters. | |
def nloglikeobs(self, params): | |
#Reconstitute the q and beta matrices from the current values of all the params | |
self.reconstitute_parameter_matrices(params) | |
#Build the regime wise matrix of Poisson means | |
self.compute_regime_specific_poisson_means() | |
#Build the matrix of Markov transition probabilities by standardizing all the q values to | |
# the 0 to 1 range | |
self.compute_markov_transition_probabilities() | |
#Build the (len(y) x k) matrix delta of Markov state probabilities distribution. k state | |
# probabilities corresponding to k regimes, times, number of time steps (i.e. observations) | |
self.compute_markov_state_probabilities() | |
#Let's increment the iteration count | |
self.iter_num=self.iter_num+1 | |
# Compute all the log-likelihood values for the Poisson Markov model | |
ll = self.compute_loglikelihood() | |
#Print out the iteration summary | |
print('ITER='+str(self.iter_num) + ' ll='+str(((-ll).sum(0)))) | |
#Return the negated array of log-likelihood values | |
return -ll | |
# Reconstitute the q and beta matrices from the current values of all the params | |
def reconstitute_parameter_matrices(self, params): | |
self.q_matrix = params[-(self.k_regimes**2):] | |
self.q_matrix = self.q_matrix.reshape([self.k_regimes, self.k_regimes]) | |
self.beta_matrix = params[:-(self.k_regimes**2)] | |
self.beta_matrix = self.beta_matrix.reshape([self.k_regimes, self.exog.shape[1]]) | |
# Build the regime wise matrix of Poisson means | |
def compute_regime_specific_poisson_means(self): | |
self.mu_matrix = [] | |
for j in range(self.k_regimes): | |
#Fetch the regression coefficients vector corresponding to the jth regime | |
beta_j = self.beta_matrix[j] | |
#Compute the Poisson mean mu as a dot product of X and Beta | |
mu_j = np.exp(self.exog.dot(beta_j)) | |
if len(self.mu_matrix) == 0: | |
self.mu_matrix = mu_j | |
else: | |
self.mu_matrix = np.vstack((self.mu_matrix,mu_j)) | |
self.mu_matrix = self.mu_matrix.transpose() | |
# Build the matrix of Markov transition probabilities by standardizing all the q values to | |
# the 0 to 1 range | |
def compute_markov_transition_probabilities(self): | |
for i in range(self.k_regimes): | |
denom = 1 | |
for r in range(self.k_regimes-1): | |
denom += math.exp(-self.q_matrix[i][r]) | |
for j in range(self.k_regimes-1): | |
self.gamma_matrix[i][j] = math.exp(-self.q_matrix[i][j])/denom | |
self.gamma_matrix[i][self.k_regimes-1] = 1.0/denom | |
# Build the (len(y) x k) matrix delta of Markov state probabilities distribution. k state | |
# probabilities corresponding to k regimes, times, number of time steps (i.e. observations) | |
def compute_markov_state_probabilities(self): | |
for t in range(1,len(self.y)): | |
self.delta_matrix[t] = np.matmul(self.delta_matrix[t-1], self.gamma_matrix) | |
# Compute all the log-likelihood values for the Poisson Markov model | |
def compute_loglikelihood(self): | |
#Init the list of loglikelihhod values, one value for each y observation | |
ll = [] | |
for t in range(len(self.y)): | |
prob_y_t = 0 | |
mu_t = 0 | |
for j in range(self.k_regimes): | |
#To use the law of total probability, uncomment this row and comment out the next | |
# two rows | |
#prob_y_t += poisson.pmf(y[t], mu[t][j]) * self.delta_matrix[t][j] | |
#Calculate the Poisson mean mu_t as an expectation over all Markov state | |
# probabilities | |
mu_t += self.mu_matrix[t][j] * self.delta_matrix[t][j] | |
prob_y_t += poisson.pmf(self.y[t], mu_t) | |
#This is a bit of a kludge. If the likelihood turns out to be real tiny, fix it to | |
# the EPS value for the machine | |
if prob_y_t < self.EPS: | |
prob_y_t = self.EPS | |
#Push the LL into the list of LLs | |
ll.append(math.log(prob_y_t)) | |
ll = np.array(ll) | |
return ll | |
#This function just tries its best to compute an invertible Hessian so that the standard | |
# errors and confidence intervals of all the trained parameters can be computed successfully. | |
def hessian(self, params): | |
for approx_hess_func in [approx_hess3, approx_hess2, approx_hess1]: | |
H = approx_hess_func(x=params, f=self.loglike, epsilon=self.EPS) | |
if np.linalg.cond(H) < 1 / self.EPS: | |
print('Found invertible hessian using '+str(approx_hess_func)) | |
return H | |
print('DID NOT find invertible hessian') | |
H[H == 0.0] = self.EPS | |
return H | |
#Download the manufacturing strikes data set from R datasets | |
strikes_dataset = sm.datasets.get_rdataset(dataname='StrikeNb', package='Ecdat') | |
#Get it as a Pandas DataFrame | |
strikes_data = strikes_dataset.data.copy() | |
#Print out the data set | |
print(strikes_dataset.data) | |
#Plot the number of strikes starting each month | |
plt.xlabel('Month index') | |
plt.ylabel('Number of strikes beginning each month') | |
strikes_data['strikes'].plot() | |
plt.show() | |
#Plot the change in manufacturing activity (from trend line) in each month | |
fig = plt.figure() | |
fig.suptitle('Change in US manufacturing activity (departure from trend line)') | |
plt.xlabel('Month index') | |
plt.ylabel('') | |
strikes_data['output'].plot() | |
plt.show() | |
#Plot the auto-correlation plot of the dependent variable 'strikes' | |
tsa.plot_acf(strikes_data['strikes'], alpha=0.05) | |
plt.show() | |
#Plot the partial auto-correlation plot of the dependent variable 'strikes' | |
tsa.plot_acf(strikes_data['strikes'], alpha=0.05) | |
plt.show() | |
#Since there is a strong correlation at lag-1, add the lag-1 copy of strikes | |
# as a regression variable. Actually, we will add ln(strikes_lag1) to avoid | |
# 'model explosion' when the coefficient is positive | |
strikes_data['strikes_lag1'] = strikes_data['strikes'].shift(1) | |
#Drop rows with empty cells | |
strikes_data = strikes_data.dropna() | |
#Create the indicator function for calculating the value of the indicator | |
# variable d1 as follows: if strikes == 0, d1 = 1, else d1 = 0 | |
def indicator_func(x): | |
if x == 0: | |
return 1 | |
else: | |
return 0 | |
#Add the column for d1 | |
strikes_data['d1'] = strikes_data['strikes_lag1'].apply(indicator_func) | |
#Adjust the lagged strikes variable so that it is set to 1, when its value is 0 | |
strikes_data['strikes_adj_lag1'] = np.maximum(1, strikes_data['strikes_lag1']) | |
#Add the natural log of strikes_lag1 as a regression variable | |
strikes_data['ln_strikes_adj_lag1'] = np.log(strikes_data['strikes_adj_lag1']) | |
#Form the regression expression. The placeholder for the intercept column will | |
# be added automatically by Patsy | |
expr = 'strikes ~ output + ln_strikes_adj_lag1 + d1' | |
#Use Patsy to carve out the y and X matrices | |
y_train, X_train = dmatrices(expr, strikes_data, return_type='dataframe') | |
#Let's look at how our X and y matrices have turned out | |
print(y_train) | |
print(X_train) | |
#We'll experiment with a 2-state HMM with the consequent assumption that the data cycles through | |
# 2 distinct regimes, each one of which influences the mean of the Poisson process | |
k_regimes = 2 | |
#There will len(X_train.columns) number of regression coefficients per regime to be sent into | |
# the model for optimization. So len(X_train.columns) * k_regimes beta coefficients in all to | |
# be optimized. | |
extra_params_names = [] | |
#The columns corresponding to one regime are already baked into X_train in the form of the | |
# regression parameters | |
for regime_num in range(1, k_regimes): | |
for param_name in X_train.columns: | |
extra_params_names.append(param_name+'_R'+str(regime_num)) | |
#The model will also optimize the k x k matrix of pseudo transition probabilities. So send those | |
# too into the extra_params vector | |
for i in range(k_regimes): | |
for j in range(k_regimes): | |
extra_params_names.append('q'+str(i)+str(j)) | |
#Create an instance of the PoissonHMM model class | |
poisson_hmm = PoissonHMM(y_train, X_train, k_regimes=k_regimes, | |
extra_params_names=extra_params_names) | |
#Train the model | |
poisson_hmm_results = poisson_hmm.fit(method='bfgs', maxiter=1000) | |
#Print the fitted params | |
print('fitted params='+str(poisson_hmm_results.params)) | |
#Also print out the Markov transition probabilities P: | |
print(poisson_hmm.gamma_matrix) | |
#Print the model training summary | |
print(poisson_hmm_results.summary()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment