Skip to content

Instantly share code, notes, and snippets.

Created November 19, 2017 02:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jdickinson202/46f4e6ed3c1d0c345bc6fb075c0bac87 to your computer and use it in GitHub Desktop.
Save jdickinson202/46f4e6ed3c1d0c345bc6fb075c0bac87 to your computer and use it in GitHub Desktop.
basic panel data models for statsmodels
import scipy
import numpy as np
import pandas as pd
import statsmodels.api as sm
from import chain_dot
from import cache_readonly
from statsmodels.regression.linear_model import (RegressionModel,
class Group():
def __init__(self, index_pandas=None, index_list=None):
index_pandas pandas (multi-)index
index_list list of arrays, pandas series or lists, all of which are
of length nobs
if index_list != None:
tup = zip(*args)
self.index = pd.MultiIndex.from_tuples(tup)
raise Exception("index_list must be a list of lists, pandas series, \
or numpy arrays, each of identitcal length nobs.")
self.index = index_pandas
self.nobs = len(self.index)
self.slices = None
def get_slices(self):
'''Only works on first index level'''
groups = self.index.get_level_values(0).unique()
self.slices = [self.index.get_loc(x) for x in groups]
def count_categories(self, level=0):
self.counts = np.bincount(self.index.labels[level])
def transform_dataframe(self, dataframe, function, level=0):
'''Apply function to each column, by group
Assumes that the dataframe already has a proper index'''
if dataframe.shape[0] != self.nobs:
raise Exception('dataframe does not have the same shape as index')
out = dataframe.groupby(level=level).apply(function)
if 1 in out.shape:
return np.ravel(out)
return np.array(out)
def transform_array(self, array, function, level=0):
'''Apply function to each column, by group'''
if array.shape[0] != self.nobs:
raise Exception('array does not have the same shape as index')
dataframe = pd.DataFrame(array, index=self.index)
return self.transform_dataframe(dataframe, function, level=level)
def transform_slices(self, array, function, **kwargs):
'''Assumes array is a 1D or 2D numpy array'''
if array.shape[0] != self.nobs:
raise Exception('array does not have the same shape as index')
if self.slices == None:
processed = []
for s in self.slices:
if array.ndim == 2:
subset = array[s,:]
elif array.ndim == 1:
subset = array[s]
processed.append(function(subset, s, **kwargs))
return np.concatenate(processed)
def dummy_sparse(self, level=0):
'''create a sparse indicator from a group array with integer labels
groups: ndarray, int, 1d (nobs,) an array of group indicators for each
observation. Group levels are assumed to be defined as consecutive
integers, i.e. range(n_groups) where n_groups is the number of
group levels. A group level with no observations for it will still
produce a column of zeros.
indi : ndarray, int8, 2d (nobs, n_groups)
an indicator array with one row per observation, that has 1 in the
column of the group level for that observation
>>> g = np.array([0, 0, 2, 1, 1, 2, 0])
>>> indi = dummy_sparse(g)
>>> indi
<7x3 sparse matrix of type '<type 'numpy.int8'>'
with 7 stored elements in Compressed Sparse Row format>
>>> indi.todense()
matrix([[1, 0, 0],
[1, 0, 0],
[0, 0, 1],
[0, 1, 0],
[0, 1, 0],
[0, 0, 1],
[1, 0, 0]], dtype=int8)
current behavior with missing groups
>>> g = np.array([0, 0, 2, 0, 2, 0])
>>> indi = dummy_sparse(g)
>>> indi.todense()
matrix([[1, 0, 0],
[1, 0, 0],
[0, 0, 1],
[1, 0, 0],
[0, 0, 1],
[1, 0, 0]], dtype=int8)
groups = self.index.labels[level]
indptr = np.arange(len(groups)+1)
data = np.ones(len(groups), dtype=np.int8)
self.dummies = scipy.sparse.csr_matrix((data, groups, indptr))
class PanelLM(RegressionModel):
def __init__(self, endog, exog, method='pooling', effects='oneway',
unit=None, time=None, hasconst=None, **kwargs):
if (time != None) and (unit != None):
self.groupings = Group(index_list=[unit, time])
self.groupings = Group(index_pandas=endog.index)
self.endog = endog
self.exog = exog
self.method = method
self.effects = effects
if method == 'swar':
self.var_u, self.var_e, self.theta = swar_ercomp(self.endog, self.exog)
super(PanelLM, self).__init__(endog, exog, **kwargs)
def initialize(self, unit=None, time=None):
self.wexog = self.whiten(self.exog)
self.wendog = self.whiten(self.endog)
self.nobs = float(self.wexog.shape[0])
self.rank = np.rank(self.exog)
self.df_model = float(self.rank - self.k_constant)
if self.method == 'within':
self.df_resid = self.nobs - self.rank - self.groupings.index.levshape[0]
self.df_resid = self.nobs - self.rank - 1
self.df_model = float(self.rank - self.k_constant)
def whiten(self, data):
g = self.groupings
if self.method == 'within':
f = lambda x: x - x.mean()
if (self.effects == 'oneway') or (self.effects == 'unit'):
out = g.transform_array(data, f, 0)
return out
elif (self.effects == 'time'):
out = g.transform_array(data, f, 1)
return out
elif (self.effects == 'twoways'):
out = g.transform_array(data, f, 0)
out = g.transform_array(data, f, 1)
return out
raise Exception('Method must be unit, time, oneway, or twoways')
elif self.method == 'between':
f = lambda x: x.mean()
out = g.transform_array(data, f, 0)
return out
elif self.method == 'fd':
f = lambda x: (x - x.shift(1))[1:],
out = g.transform_array(data, f, 0)
return out
elif self.method == 'pooling':
return data
elif self.method == 'swar':
out = g.transform_slices(array=data, function=swar_transform,
return out
def fit(self, method="pinv", **kwargs):
wexog = self.wexog
self.pinv_wexog = pinv_wexog = np.linalg.pinv(wexog)
self.normalized_cov_params =, pinv_wexog.T)
beta =, self.wendog)
lfit = PanelLMResults(self, beta,
return lfit
def swar_ercomp(y, X):
'''Swamy-Arora error decomposition'''
b = PanelLM(y, X, 'between').fit()
w = PanelLM(y, X, 'within').fit()
Ts = w.model.groupings.counts
Th = scipy.stats.mstats.hmean(Ts)
var_e = w.ssr / (X.shape[0] - w.model.groupings.index.levshape[0] - X.shape[1] + 1)
var_u = b.ssr / (b.model.groupings.index.levshape[0] - X.shape[1]) - var_e / Th
var_u = max(var_u, 0)
Ts = np.concatenate([np.repeat(x,x) for x in Ts])
theta = 1 - np.sqrt(var_e / (Ts * var_u + var_e))
return var_e, var_u, np.array(theta)
def swar_transform(subset, position, theta):
'''Apply to a sub-group of observations'''
n = subset.shape[0]
B = np.ones((n,n)) / n
out = subset - chain_dot(np.diag(theta[position]), B, subset)
return out
class PanelLMResults(RegressionResults):
def __init__(self, model, params, normalized_cov_params=None):
super(PanelLMResults, self).__init__(model, params)
self.normalized_cov_params = normalized_cov_params
def bse(self):
if self.model.method == 'within':
scale = self.nobs - self.model.groupings.index.levshape[0] - self.df_model
scale = np.sum(self.wresid**2) / scale
bse = np.sqrt(np.diag(self.cov_params(scale=scale)))
bse = np.sqrt(np.diag(self.cov_params()))
return bse
def ssr(self):
return np.sum(self.wresid**2)
def resid(self):
if (self.model.method == 'within') and not (self.model.panel_balanced):
Xb_bar =, self.params)
alph = np.mean(self.model.endog) - Xb_bar
pred = alph +, self.params)
resid = self.model.endog - pred
resid = self.model.wendog - self.model.predict(self.params,
return resid
def fvalue(self):
f = self.mse_model/self.mse_resid
if self.model.method == 'between':
# TODO: Why is this correct (from Stata)?
f = f / 2.
return f
def scale(self):
wresid = self.wresid
return, wresid) / self.df_resid
def pooltest(endog, exog):
'''Chow poolability test: F-test of joint significance for the unit dummies
in a LSDV model
F statistic for the null hypothesis that unit effects are zero.
p value
# TODO: Does this assume balanced panels?
unrestricted = PanelLM(endog, exog, 'within').fit()
restricted = PanelLM(endog, exog, 'pooling').fit()
N = unrestricted.model.panel_N
T = unrestricted.model.panel_T
K = unrestricted.model.exog.shape[1]
urss = unrestricted.ssr
rrss = restricted.ssr
F = ((rrss - urss) / (N-1)) / (urss / (N*T - N - K))
p = 1 - scipy.stats.distributions.f.cdf(F, N-1, N*(T-1)-K)
return F, p
from statsmodels.iolib.summary import summary_col
from patsy import dmatrices
def test(y, X):
mod1 = PanelLM(y, X, method='pooling').fit()
mod2 = PanelLM(y, X, method='between').fit()
mod3 = PanelLM(y, X.ix[:,1:], method='within').fit()
mod4 = PanelLM(y, X.ix[:,1:], method='within', effects='time').fit()
mod5 = PanelLM(y, X.ix[:,1:], method='within', effects='twoways').fit()
mod6 = PanelLM(y, X, 'swar').fit()
mn = ['OLS', 'Between', 'Within N', 'Within T', 'Within 2w', 'RE-SWAR']
out = summary_col([mod1, mod2, mod3, mod4, mod5, mod6], model_names=mn, stars=False)
return out
url = ''
url = 'EmplUK.csv'
gasoline = pd.read_csv(url).set_index(['firm', 'year'])
f = 'emp~wage+capital'
y, X = dmatrices(f, gasoline, return_type='dataframe')
# Statsmodels
OLS Between Within N Within T Within 2w RE-SWAR
Intercept 10.3598 10.5060 9.0390
(1.2023) (3.3055) (1.1015)
capital 2.1049 2.2673 0.8015 2.0999 0.7854 1.1564
(0.0441) (0.1136) (0.0641) (0.0470) (0.0622) (0.0592)
wage -0.3238 -0.3371 -0.1436 -0.3334 -0.0955 -0.1589
(0.0487) (0.1338) (0.0328) (0.0527) (0.0356) (0.0338)
N 1031 140 1031 1031 1031 1031
R2 0.693 0.748 0.164 0.695 0.155 0.283
Standard errors in parentheses.
# Stata replication
insheet using EmplUK.csv, clear
xtset firm year
xtreg emp wage capital, fe
# R replication
dat = read.csv('EmplUK.csv')
dat = pdata.frame(dat, c('firm', 'year'))
mod = plm(emp ~ wage + capital, data=dat, model='within')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment