This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class PoissonHMM(GenericLikelihoodModel): | |
def __init__(self, endog, exog, k_regimes=2, loglike=None, score=None, hessian=None, | |
missing='none', extra_params_names=None, **kwds): | |
super(PoissonHMM, self).__init__(endog=endog, exog=exog, loglike=loglike, score=score, | |
hessian=hessian, missing=missing, | |
extra_params_names=extra_params_names, kwds=kwds) | |
self.y = np.array(self.endog) | |
self.k_regimes = k_regimes | |
#k_regimes x exog.shape[1] size matrix of regime specific regression coefficients | |
self.beta_matrix = np.ones([self.k_regimes, self.exog.shape[1]]) | |
# k x k matrix of psuedo transition probabilities which can range from -inf to +inf during | |
# optimization. Initialized to 1.0/k | |
self.q_matrix = np.ones([self.k_regimes,self.k_regimes])*(1.0/self.k_regimes) | |
print('self.q_matrix='+str(self.q_matrix)) | |
#The regime wise matrix of Poisson means. These would be updated during the optimization | |
# loop | |
self.mu_matrix = [] | |
# k x k matrix of the real Markov transition probabilities which will be calculated from | |
# the q-matrix using a standardization technique. Initialized to 1.0/k | |
self.gamma_matrix = np.ones([self.k_regimes, self.k_regimes])*(1.0/self.k_regimes) | |
print('self.gamma_matrix='+str(self.gamma_matrix)) | |
# The Markov state probabilities. Also referred to as pi. but we'll use delta since pi is | |
# often used to refer to the mean | |
self.delta_matrix = np.ones([self.exog.shape[0],self.k_regimes])*(1.0/self.k_regimes) | |
print('self.delta_matrix='+str(self.delta_matrix)) | |
#The vector of initial values for all the parameters, beta and q, that the optimizer will | |
# optimize | |
self.start_params = np.repeat(np.ones(self.exog.shape[1]), repeats=self.k_regimes) | |
self.start_params = np.append(self.start_params, self.q_matrix.flatten()) | |
print('self.start_params='+str(self.start_params)) | |
#A very tiny number (machine specific). Used by the LL function. | |
self.EPS = np.MachAr().eps | |
#Optimization iteration counter | |
self.iter_num=0 | |
#This method is called by the optimizer once in each iteration to get the current value of | |
# the log-likelihood corresponding to the current values of all the parameters. | |
def nloglikeobs(self, params): | |
#Reconstitute the q and beta matrices from the current values of all the params | |
self.reconstitute_parameter_matrices(params) | |
#Build the regime wise matrix of Poisson means | |
self.compute_regime_specific_poisson_means() | |
#Build the matrix of Markov transition probabilities by standardizing all the q values to | |
# the 0 to 1 range | |
self.compute_markov_transition_probabilities() | |
#Build the (len(y) x k) matrix delta of Markov state probabilities distribution. k state | |
# probabilities corresponding to k regimes, times, number of time steps (i.e. observations) | |
self.compute_markov_state_probabilities() | |
#Let's increment the iteration count | |
self.iter_num=self.iter_num+1 | |
# Compute all the log-likelihood values for the Poisson Markov model | |
ll = self.compute_loglikelihood() | |
#Print out the iteration summary | |
print('ITER='+str(self.iter_num) + ' ll='+str(((-ll).sum(0)))) | |
#Return the negated array of log-likelihood values | |
return -ll | |
# Reconstitute the q and beta matrices from the current values of all the params | |
def reconstitute_parameter_matrices(self, params): | |
self.q_matrix = params[-(self.k_regimes**2):] | |
self.q_matrix = self.q_matrix.reshape([self.k_regimes, self.k_regimes]) | |
self.beta_matrix = params[:-(self.k_regimes**2)] | |
self.beta_matrix = self.beta_matrix.reshape([self.k_regimes, self.exog.shape[1]]) | |
# Build the regime wise matrix of Poisson means | |
def compute_regime_specific_poisson_means(self): | |
self.mu_matrix = [] | |
for j in range(self.k_regimes): | |
#Fetch the regression coefficients vector corresponding to the jth regime | |
beta_j = self.beta_matrix[j] | |
#Compute the Poisson mean mu as a dot product of X and Beta | |
mu_j = np.exp(self.exog.dot(beta_j)) | |
if len(self.mu_matrix) == 0: | |
self.mu_matrix = mu_j | |
else: | |
self.mu_matrix = np.vstack((self.mu_matrix,mu_j)) | |
self.mu_matrix = self.mu_matrix.transpose() | |
# Build the matrix of Markov transition probabilities by standardizing all the q values to | |
# the 0 to 1 range | |
def compute_markov_transition_probabilities(self): | |
for i in range(self.k_regimes): | |
denom = 1 | |
for r in range(self.k_regimes-1): | |
denom += math.exp(-self.q_matrix[i][r]) | |
for j in range(self.k_regimes-1): | |
self.gamma_matrix[i][j] = math.exp(-self.q_matrix[i][j])/denom | |
self.gamma_matrix[i][self.k_regimes-1] = 1.0/denom | |
# Build the (len(y) x k) matrix delta of Markov state probabilities distribution. k state | |
# probabilities corresponding to k regimes, times, number of time steps (i.e. observations) | |
def compute_markov_state_probabilities(self): | |
for t in range(1,len(self.y)): | |
self.delta_matrix[t] = np.matmul(self.delta_matrix[t-1], self.gamma_matrix) | |
# Compute all the log-likelihood values for the Poisson Markov model | |
def compute_loglikelihood(self): | |
#Init the list of loglikelihhod values, one value for each y observation | |
ll = [] | |
for t in range(len(self.y)): | |
prob_y_t = 0 | |
mu_t = 0 | |
for j in range(self.k_regimes): | |
#To use the law of total probability, uncomment this row and comment out the next | |
# two rows | |
#prob_y_t += poisson.pmf(y[t], mu[t][j]) * self.delta_matrix[t][j] | |
#Calculate the Poisson mean mu_t as an expectation over all Markov state | |
# probabilities | |
mu_t += self.mu_matrix[t][j] * self.delta_matrix[t][j] | |
prob_y_t += poisson.pmf(self.y[t], mu_t) | |
#This is a bit of a kludge. If the likelihood turns out to be real tiny, fix it to | |
# the EPS value for the machine | |
if prob_y_t < self.EPS: | |
prob_y_t = self.EPS | |
#Push the LL into the list of LLs | |
ll.append(math.log(prob_y_t)) | |
ll = np.array(ll) | |
return ll | |
#This function just tries its best to compute an invertible Hessian so that the standard | |
# errors and confidence intervals of all the trained parameters can be computed successfully. | |
def hessian(self, params): | |
for approx_hess_func in [approx_hess3, approx_hess2, approx_hess1]: | |
H = approx_hess_func(x=params, f=self.loglike, epsilon=self.EPS) | |
if np.linalg.cond(H) < 1 / self.EPS: | |
print('Found invertible hessian using '+str(approx_hess_func)) | |
return H | |
print('DID NOT find invertible hessian') | |
H[H == 0.0] = self.EPS | |
return H |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment