Skip to content

Instantly share code, notes, and snippets.

@sachinsdate
Created November 26, 2021 11:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sachinsdate/038fd66b7de15aefdd2fae450f76e81f to your computer and use it in GitHub Desktop.
Save sachinsdate/038fd66b7de15aefdd2fae450f76e81f to your computer and use it in GitHub Desktop.
class PoissonHMM(GenericLikelihoodModel):
def __init__(self, endog, exog, k_regimes=2, loglike=None, score=None, hessian=None,
missing='none', extra_params_names=None, **kwds):
super(PoissonHMM, self).__init__(endog=endog, exog=exog, loglike=loglike, score=score,
hessian=hessian, missing=missing,
extra_params_names=extra_params_names, kwds=kwds)
self.y = np.array(self.endog)
self.k_regimes = k_regimes
#k_regimes x exog.shape[1] size matrix of regime specific regression coefficients
self.beta_matrix = np.ones([self.k_regimes, self.exog.shape[1]])
# k x k matrix of psuedo transition probabilities which can range from -inf to +inf during
# optimization. Initialized to 1.0/k
self.q_matrix = np.ones([self.k_regimes,self.k_regimes])*(1.0/self.k_regimes)
print('self.q_matrix='+str(self.q_matrix))
#The regime wise matrix of Poisson means. These would be updated during the optimization
# loop
self.mu_matrix = []
# k x k matrix of the real Markov transition probabilities which will be calculated from
# the q-matrix using a standardization technique. Initialized to 1.0/k
self.gamma_matrix = np.ones([self.k_regimes, self.k_regimes])*(1.0/self.k_regimes)
print('self.gamma_matrix='+str(self.gamma_matrix))
# The Markov state probabilities. Also referred to as pi. but we'll use delta since pi is
# often used to refer to the mean
self.delta_matrix = np.ones([self.exog.shape[0],self.k_regimes])*(1.0/self.k_regimes)
print('self.delta_matrix='+str(self.delta_matrix))
#The vector of initial values for all the parameters, beta and q, that the optimizer will
# optimize
self.start_params = np.repeat(np.ones(self.exog.shape[1]), repeats=self.k_regimes)
self.start_params = np.append(self.start_params, self.q_matrix.flatten())
print('self.start_params='+str(self.start_params))
#A very tiny number (machine specific). Used by the LL function.
self.EPS = np.MachAr().eps
#Optimization iteration counter
self.iter_num=0
#This method is called by the optimizer once in each iteration to get the current value of
# the log-likelihood corresponding to the current values of all the parameters.
def nloglikeobs(self, params):
#Reconstitute the q and beta matrices from the current values of all the params
self.reconstitute_parameter_matrices(params)
#Build the regime wise matrix of Poisson means
self.compute_regime_specific_poisson_means()
#Build the matrix of Markov transition probabilities by standardizing all the q values to
# the 0 to 1 range
self.compute_markov_transition_probabilities()
#Build the (len(y) x k) matrix delta of Markov state probabilities distribution. k state
# probabilities corresponding to k regimes, times, number of time steps (i.e. observations)
self.compute_markov_state_probabilities()
#Let's increment the iteration count
self.iter_num=self.iter_num+1
# Compute all the log-likelihood values for the Poisson Markov model
ll = self.compute_loglikelihood()
#Print out the iteration summary
print('ITER='+str(self.iter_num) + ' ll='+str(((-ll).sum(0))))
#Return the negated array of log-likelihood values
return -ll
# Reconstitute the q and beta matrices from the current values of all the params
def reconstitute_parameter_matrices(self, params):
self.q_matrix = params[-(self.k_regimes**2):]
self.q_matrix = self.q_matrix.reshape([self.k_regimes, self.k_regimes])
self.beta_matrix = params[:-(self.k_regimes**2)]
self.beta_matrix = self.beta_matrix.reshape([self.k_regimes, self.exog.shape[1]])
# Build the regime wise matrix of Poisson means
def compute_regime_specific_poisson_means(self):
self.mu_matrix = []
for j in range(self.k_regimes):
#Fetch the regression coefficients vector corresponding to the jth regime
beta_j = self.beta_matrix[j]
#Compute the Poisson mean mu as a dot product of X and Beta
mu_j = np.exp(self.exog.dot(beta_j))
if len(self.mu_matrix) == 0:
self.mu_matrix = mu_j
else:
self.mu_matrix = np.vstack((self.mu_matrix,mu_j))
self.mu_matrix = self.mu_matrix.transpose()
# Build the matrix of Markov transition probabilities by standardizing all the q values to
# the 0 to 1 range
def compute_markov_transition_probabilities(self):
for i in range(self.k_regimes):
denom = 1
for r in range(self.k_regimes-1):
denom += math.exp(-self.q_matrix[i][r])
for j in range(self.k_regimes-1):
self.gamma_matrix[i][j] = math.exp(-self.q_matrix[i][j])/denom
self.gamma_matrix[i][self.k_regimes-1] = 1.0/denom
# Build the (len(y) x k) matrix delta of Markov state probabilities distribution. k state
# probabilities corresponding to k regimes, times, number of time steps (i.e. observations)
def compute_markov_state_probabilities(self):
for t in range(1,len(self.y)):
self.delta_matrix[t] = np.matmul(self.delta_matrix[t-1], self.gamma_matrix)
# Compute all the log-likelihood values for the Poisson Markov model
def compute_loglikelihood(self):
#Init the list of loglikelihhod values, one value for each y observation
ll = []
for t in range(len(self.y)):
prob_y_t = 0
mu_t = 0
for j in range(self.k_regimes):
#To use the law of total probability, uncomment this row and comment out the next
# two rows
#prob_y_t += poisson.pmf(y[t], mu[t][j]) * self.delta_matrix[t][j]
#Calculate the Poisson mean mu_t as an expectation over all Markov state
# probabilities
mu_t += self.mu_matrix[t][j] * self.delta_matrix[t][j]
prob_y_t += poisson.pmf(self.y[t], mu_t)
#This is a bit of a kludge. If the likelihood turns out to be real tiny, fix it to
# the EPS value for the machine
if prob_y_t < self.EPS:
prob_y_t = self.EPS
#Push the LL into the list of LLs
ll.append(math.log(prob_y_t))
ll = np.array(ll)
return ll
#This function just tries its best to compute an invertible Hessian so that the standard
# errors and confidence intervals of all the trained parameters can be computed successfully.
def hessian(self, params):
for approx_hess_func in [approx_hess3, approx_hess2, approx_hess1]:
H = approx_hess_func(x=params, f=self.loglike, epsilon=self.EPS)
if np.linalg.cond(H) < 1 / self.EPS:
print('Found invertible hessian using '+str(approx_hess_func))
return H
print('DID NOT find invertible hessian')
H[H == 0.0] = self.EPS
return H
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment