Skip to content

Instantly share code, notes, and snippets.

@josef-pkt
Forked from jseabold/empirical_likelihood.py
Created March 20, 2012 19:29
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save josef-pkt/2140219 to your computer and use it in GitHub Desktop.
Save josef-pkt/2140219 to your computer and use it in GitHub Desktop.
Empirical Likelihood with Moment Constraints
'''
Author: Skipper
https://gist.github.com/2109628
Josef: adjustments to example
'''
from statsmodels.base.model import LikelihoodModel
import numpy as np
from numpy import asarray
from scipy import optimize
def log_star(z, eps):
"""
Owens' log* function. Provides curvature at eps.
"""
log_idx = z > eps
loggedarr = np.zeros_like(z)
z_ok = z[log_idx]
z_star = z[~log_idx]
loggedarr[log_idx] = np.log(z_ok)
loggedarr[~log_idx] = np.log(eps) - 1.5 + 2*z_star/eps - \
z_star**2/(2*eps**2)
return loggedarr
class EstEqs(LikelihoodModel):
"""
Class for Estimating Equations
esteqs is an iterable of functions (func1, func2, ...) and args is an
iterable of arguments to be passed to the functions ((args1,),
(args2,)).
The estimating equations are expected to be called, for instance, as
func1(params, *args1)
func2(params, *args2)
"""
def __init__(self, endog, exog=None, esteqs=(), args=()):
if esteqs is None:
raise ValueError("Specify estimating equations")
# if len(esteqs) != len(args):
# raise ValueError("The length of esteq and args are not the same.")
# self.J = len(esteqs) # number of estimating equations
self.esteqs = esteqs
self._args = args
super(EstEqs, self).__init__(endog, exog)
def _get_esteqs(self, params):
# esteqs = self.esteqs
# arr = np.empty((self.nobs,self.J))
# args = self._args
# for i,eq in enumerate(esteqs):
# arr[:,i,None] = eq(params, *args[i])
return self.esteqs(params, *self._args)
#unnecessary, really with this syntax
def initialize(self):
pass
class EL(EstEqs):
"""
Empirical Likelihood
Notes
-----
The API is experimental
"""
def __init__(self, endog, exog=None, esteqs=(), args=()):
super(EL, self).__init__(endog, exog, esteqs, args)
self.nobs = len(endog)
def pmf(self, params):
nobs = self.nobs
esteqs = self._get_esteqs(params)
constraints = self.constraints
return (nobs * (1+(constraints*esteqs).sum(1)))**-1 # should use dot
def _get_constraints(self, constraints, params):
esteqs = self._get_esteqs(params)
nobs = self.nobs
return 1./nobs*np.sum(1./(1-(constraints*esteqs).sum(1))[:,None]\
*esteqs, axis=0)
def fit(self, start_params, params_bounds=None):
# if self.exog is None: # no way to guess params
# you're going to *have* to give start_params
# start_params = np.array([0.1])
#TODO: if it's a linear model, then do this.
# exog = self.exog
# k = exog.shape[1]
# start_params = np.array([0.1]*k)
start_params = np.array(start_params)
interm_func = self._get_constraints
# self.constraints = optimize.fsolve(interm_func, [0,0],
# args=(start_params,))
func = self.emploglike
ret = optimize.fmin_tnc(func, start_params, approx_grad=True, eta=1e-4,
pgtol=1e-8, xtol=1e-5, ftol=1e-24, messages=0)
# constraints = self._get_constraints(constraints, ret[0])
# self.constraints = optimize.fsolve(interm_func, [0,0], args=ret[0])
self.params = ret[0]
self.results = ret
def emploglike(self, params):
"""
Dual (scaled) empirical likelihood. log(L_{EL}(params;endog))
"""
interm_func = self._get_constraints
#print params, "params"
# constraints = optimize.fsolve(interm_func, [0,0], args=(params,),
# factor=10, epsfcn=1e-5, xtol=1e-6)
nobs = self.nobs
eps = 1./nobs
#Try R docs way
def P(constraints, params):
esteq = self._get_esteqs(params)
return -1./nobs * np.sum(log_star(1-(constraints*esteq).sum(1),
eps))
constraints = optimize.fmin_tnc(P, [0,0], args=(params,),
approx_grad=True, messages=0)[0]
#print constraints, "constraints"
self.constraints = constraints
esteqs = self._get_esteqs(params)
return 1./nobs*np.sum(log_star(1-(constraints*esteqs).sum(1),
eps))
#TODO: looks close, but we run sometimes into negative values in the
#params, check the constraint that can't be violated
if __name__ == "__main__":
from scipy import stats
nobs = 2000
theta = 1.5
#np.random.seed(1234)
#Y = stats.norm.rvs(theta, np.sqrt(theta**2 + 1),size=(nobs,1))
#Y = stats.t.rvs(5, loc=theta, scale=np.sqrt(theta**2 + 1),size=(nobs,1))
Y = stats.t.rvs(5, loc=theta, scale=np.sqrt(theta**2 + 1)/1.2909944487358056,size=(nobs,1))
# Y[:10] *= 2
# Y[:10] += 0.2
#Y = np.loadtxt('./Y_EL.txt')
#Y = Y[:,None]
def esteqs(params, Y):
params = asarray([params])
#return np.column_stack((Y - params,Y**2 - 2*params**2 -1))
return np.column_stack((Y - params, Y**2 - 2*params**2 -1))
def esteqs_grad(params, Y):
return np.column_stack((-np.ones((len(Y),1)),
np.repeat(-4*params,(40,1))))
ELmod = EL(Y, esteqs=esteqs, args=(Y,))
# (Y,) TODO: set it so that it accepts (Y) as well
ELmod.fit(np.array([Y.mean()]), params_bounds=[(Y.min(),Y.max())])
th = ELmod.results[0]
print 'true, th, Y.mean()'
print theta, th, Y.mean()
print 'true, th**2+1, Y.var(), Y.var(ddof=1)'
print theta**2+1, th**2 + 1, Y.var(), Y.var(ddof=1)
res_mc = []
nrepl = 100
for i in range(nrepl):
Y = stats.t.rvs(5, loc=theta, scale=np.sqrt(theta**2 + 1)/1.2909944487358056,size=(nobs,1))
ELmod = EL(Y, esteqs=esteqs, args=(Y,))
ELmod.fit(np.array([Y.mean()]), params_bounds=[(Y.min(),Y.max())])
th = ELmod.results[0]
th = np.squeeze(th)
res_mc.append([th, Y.mean(), th**2 + 1, Y.var(), Y.var(ddof=1)])
res_mc = np.asarray(res_mc)
v = theta**2+1
res_mc_dev = res_mc - [theta, theta, v, v, v]
print res_mc_dev.mean(0)
print res_mc_dev.std(0)
#EL has smaller bias and a bit smaller RMSE
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment