Created
September 21, 2019 12:18
-
-
Save justinian336/fd87a724c4abd80a5b92565498072064 to your computer and use it in GitHub Desktop.
John Rust's NFXP algorithm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Lazy implementation of John Rust's Nested Fixed Point (NFXP) algorithm. | |
# Based on https://notes.quantecon.org/submission/5c4906d9f68373000f919cd0 | |
# Download first the Aguirregabiria and Mira NPL code from here: http://individual.utoronto.ca/vaguirre/wpapers/program_code_survey_joe_2008.html | |
import time | |
import numpy as np | |
import pandas as pd | |
from datetime import date | |
from statsmodels.base.model import GenericLikelihoodModel | |
from scipy import stats | |
from scipy.interpolate import interp1d | |
import matplotlib.pyplot as plt | |
# Read the data | |
def get_data(): | |
data = np.fromfile('./aguirregabiria_mira/bus1234.dat') | |
data = data.reshape(int(len(data)/6),6) | |
data = pd.DataFrame(data,columns=['id','group','year','month','replace','miles']) | |
data['miles'] = (data['miles'])/1e6 | |
data['date'] = pd.to_datetime(data[['year', 'month']].assign(Day=1)) | |
data = data[['id','group','date','replace','miles']] | |
date_lag = data.copy() | |
date_lag['date'] = date_lag['date'] - pd.DateOffset(months=1) | |
data = data.merge(date_lag, how='left', on=['id','group','date'] , suffixes=('','_next')) | |
data = data.dropna() | |
return data | |
# Obtain density of miles and transition probabilities | |
def get_miles_states(miles, step): | |
states = np.arange(miles.min(), miles.max(), step) | |
return states | |
def estimate_miles_pdf(df, step): | |
dx = (1 - df['replace'])*(df['miles_next'] - df['miles']) + df['replace']*df['miles_next'] | |
states = np.arange(dx.min(), dx.max(), step) | |
kernel = stats.gaussian_kde(dx, bw_method='silverman') | |
pdfdx = kernel(states) | |
return np.array([pdfdx/pdfdx.sum()]).transpose() | |
# Gets the transition probability assuming no replacement | |
def get_transition_probs(df, step): | |
pdfdx = estimate_miles_pdf(df, step).transpose() | |
miles_states = get_miles_states(df['miles'], step) | |
end_zeros = np.zeros((1, len(miles_states) - pdfdx.shape[1])) | |
fmat = np.concatenate((pdfdx, end_zeros), axis=1) | |
for row in range(1, len(miles_states)): | |
cutoff = len(miles_states) - row - pdfdx.shape[1] | |
# Simply append the densities, sliding them one column every row while not reaching the end | |
if cutoff >=0: | |
start_zeros = np.zeros((1, row)) | |
end_zeros = np.zeros((1, len(miles_states) - pdfdx.shape[1] - row)) | |
fmat_new = np.concatenate((start_zeros, pdfdx, end_zeros), axis=1) | |
fmat = np.concatenate((fmat, fmat_new)) | |
# Reached the end, must redistribute the density to make sure that the rows sum to one | |
else: | |
remaining_pdf = pdfdx[:, 0:cutoff] | |
redistributed_pdf = remaining_pdf/remaining_pdf.sum() | |
start_zeros = np.zeros((1, row)) | |
fmat_new = np.concatenate((start_zeros, redistributed_pdf), axis=1) | |
fmat = np.concatenate((fmat, fmat_new)) | |
return fmat | |
# Maximum Likelihood | |
class NFXPML(GenericLikelihoodModel): | |
def __init__(self, df, ev, **kwds): | |
super(NFXPML, self).__init__(df['replace'].values, df['miles'].values, **kwds) | |
self.step = 0.00195 | |
self.trans = get_transition_probs(df, self.step) | |
self.ev = ev | |
self.states = get_miles_states(self.exog, self.step) | |
self.data.xnames = ['tr', 'c'] | |
self.contraction_tolerance = 10e-16 | |
self.beta = 0.9999 | |
def get_linear_cost_vector(self, cost_theta): | |
c = cost_theta[0]*self.states.reshape((len(self.states), 1)) | |
return c | |
def get_square_root_cost_vector(self, cost_theta): | |
return cost_theta[0]*np.sqrt(self.states.reshape((len(self.states), 1))) | |
# Fixed Point: | |
def contraction_iterations(self, theta): | |
keep_iterating = True | |
start_time = time.time() | |
while keep_iterating: | |
ev_0 = self.ev | |
n,_ = ev_0.shape | |
tr = theta[0] | |
cost_theta = theta[1:] | |
c = self.get_linear_cost_vector(cost_theta) | |
non_replacement_case = -c + self.beta*ev_0 | |
# The following parameter recenters the probability estimate to avoid over/underflow | |
K = non_replacement_case.max() | |
within_log = np.exp(non_replacement_case - K) + np.exp(-tr - c[0,0] + self.beta*ev_0[0,0] - K) | |
ev_1 = np.log(within_log) | |
ev_1 = K + np.matmul(self.trans, ev_1) | |
self.ev = ev_1 | |
deviation = np.abs(np.max(ev_1 - ev_0, axis=0)) | |
if deviation < self.contraction_tolerance: | |
keep_iterating = False | |
end_time = time.time() | |
lapsed = end_time - start_time | |
return self.ev | |
def get_choice_pr(self, theta): | |
tr = theta[0] | |
cost_theta = theta[1:] | |
c = self.get_linear_cost_vector(cost_theta) | |
non_replacement_prob = 1/(1 + np.exp(c - self.beta*self.ev - tr - c[0,0] + self.beta*self.ev[0,0])) | |
states = self.states.reshape((len(self.states), 1)) | |
pr = interp1d(states.squeeze(), non_replacement_prob.squeeze()) | |
x = (self.exog/self.step).astype(int)*self.step | |
pr = pr(x.squeeze()) | |
non_replacement_prob = (1 - self.endog)*pr + self.endog*(1 - pr) | |
return non_replacement_prob | |
def loglike(self, theta): | |
# Inner loop: Fixed Point | |
self.contraction_iterations(theta) | |
# Outer loop: ML | |
choice_pr = self.get_choice_pr(theta) | |
tr, c = theta | |
print('Current tr={:.4f}, c={:.4f}'.format(tr, c)) | |
ll = np.log(choice_pr).sum() | |
return ll | |
df = get_data() | |
states = get_miles_states(df['miles'], 0.00195) | |
solver = NFXPML(df, np.ones(len(states)).reshape((len(states), 1))) | |
res = solver.fit(start_params=np.ones((2,1))*0.5, maxiter=100000, full_output=True, disp=True) | |
print(res.summary()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment