Skip to content

Instantly share code, notes, and snippets.

@BioSciEconomist
Last active April 9, 2021 02:02
Show Gist options
  • Save BioSciEconomist/fe9078d61eb6c86de9b5529873b09d29 to your computer and use it in GitHub Desktop.
Save BioSciEconomist/fe9078d61eb6c86de9b5529873b09d29 to your computer and use it in GitHub Desktop.
demonstrate the impact of small samples and errors in sign and magnitude
# *-----------------------------------------------------------------
# | PROGRAM NAME: ex sample size.py
# | DATE: 4/8/21
# | CREATED BY: MATT BOGARD
# | PROJECT FILE:
# *----------------------------------------------------------------
# | PURPOSE: demonstrate the impact of small samples and errors in sign and magnitude
# *----------------------------------------------------------------
import pandas as pd
import numpy as np
from matplotlib import pyplot
import statsmodels.api as sm # import stastmodels
import statsmodels.formula.api as smf # this allows us to use an explicit formulation
import statsmodels
from statsmodels.stats import power
pd.set_option('display.float_format', '{:.2f}'.format) # suppress sci notation
#------------------------------------------
# big difference moderate noise
#------------------------------------------
trt = pd.DataFrame(columns=['wtchg','treat'])
np.random.seed(123)
trt['wtchg'] = np.random.normal(-5,5,10000)
trt['treat'] = 1
ctrl = pd.DataFrame(columns=['wtchg','treat'])
np.random.seed(123)
ctrl['wtchg'] = np.random.normal(0,5,10000)
ctrl['treat'] = 0
df = pd.concat([trt,ctrl],ignore_index=True)
df.describe()
df.groupby('treat')['wtchg'].mean()
# visualize
bins = np.linspace(-30, 30, 100)
pyplot.hist(df.wtchg[df.treat == 1], bins, alpha=0.5, label='treatment')
pyplot.hist(df.wtchg[df.treat == 0], bins, alpha=0.5, label='control')
pyplot.legend(loc='upper right')
#-------------------------------------------
# analysis
#-------------------------------------------
results = smf.ols('wtchg ~ treat', data=df).fit()
results.summary2()
#--------------------------------------
# power analysis
#-------------------------------------
diff = 5 # this is the size of the impact we want to measure
sigmaA = 5 # standard deviation for group A
sigmaB = 5# standard deviation for group B (for now assume common variance between groups)
Q0 = .5 # proportion in the control group (.5 implies equal sized groups)
sigma_pooled = np.sqrt((np.square(sigmaA)*(1/Q0 -1 ) + np.square(sigmaB)*(1/(1-Q0) -1 ) )/((1/Q0 -1 ) + (1/(1-Q0) -1 ))) # pooled standard deviation
#
d = abs(diff)/sigma_pooled # calculate effect size
print(sigma_pooled)
print(diff,d)
# calcualte required sample size per group
statsmodels.stats.power.tt_ind_solve_power(effect_size= d, nobs1=None, alpha=.05, power= .80, ratio=1.0, alternative='two-sided')
nSize = 17
#
# test a random sample
#
np.random.seed(123)
sample_t = trt.sample(nSize, replace=True)
np.random.seed(123)
sample_c = ctrl.sample(nSize, replace=True)
# stack data
tmp = pd.concat([sample_t,sample_c],ignore_index=True)
# test impact
results = smf.ols('wtchg ~ treat', data=tmp).fit()
results.summary2()
#------------------------------------------
# smaller difference moderate noise
#------------------------------------------
trt = pd.DataFrame(columns=['wtchg','treat'])
np.random.seed(123)
trt['wtchg'] = np.random.normal(-2,5,10000)
trt['treat'] = 1
ctrl = pd.DataFrame(columns=['wtchg','treat'])
np.random.seed(123)
ctrl['wtchg'] = np.random.normal(0,5,10000)
ctrl['treat'] = 0
df = pd.concat([trt,ctrl],ignore_index=True)
df.describe()
df.groupby('treat')['wtchg'].mean()
# visualize
bins = np.linspace(-30, 30, 100)
pyplot.hist(df.wtchg[df.treat == 1], bins, alpha=0.5, label='treatment')
pyplot.hist(df.wtchg[df.treat == 0], bins, alpha=0.5, label='control')
pyplot.legend(loc='upper right')
#--------------------------------------
# power analysis
#-------------------------------------
diff = 2 # this is the size of the impact we want to measure
sigmaA = 5 # standard deviation for group A
sigmaB = 5# standard deviation for group B (for now assume common variance between groups)
Q0 = .5 # proportion in the control group (.5 implies equal sized groups)
sigma_pooled = np.sqrt((np.square(sigmaA)*(1/Q0 -1 ) + np.square(sigmaB)*(1/(1-Q0) -1 ) )/((1/Q0 -1 ) + (1/(1-Q0) -1 ))) # pooled standard deviation
#
d = abs(diff)/sigma_pooled # calculate effect size
print(sigma_pooled)
print(diff,d)
# calcualte required sample size per group
statsmodels.stats.power.tt_ind_solve_power(effect_size= d, nobs1=None, alpha=.05, power= .80, ratio=1.0, alternative='two-sided')
#
# test a random sample
#
nsize = 100
np.random.seed(123)
sample_t = trt.sample(nsize, replace=True)
np.random.seed(123)
sample_c = ctrl.sample(nsize, replace=True)
# stack data
tmp = pd.concat([sample_t,sample_c],ignore_index=True)
# test impact
results = smf.ols('wtchg ~ treat', data=tmp).fit()
results.summary2()
#
# what if we only had 10 people per group instead of the required 100?
#
nsize = 10
np.random.seed(123)
sample_t = trt.sample(nsize, replace=True)
np.random.seed(123)
sample_c = ctrl.sample(nsize, replace=True)
# stack data
tmp = pd.concat([sample_t,sample_c],ignore_index=True)
# test impact
results = smf.ols('wtchg ~ treat', data=tmp).fit()
results.summary2()
# what are additional risks from too small of a sample
nSize = 10
def run_tests(trt_dat,ctrl_dat,size):
"""creates a bootstrap sample, computes replicates and returns replicates array"""
# Create an empty array to store replicates
est = np.empty(size)
seeds = np.empty(size)
# pull bootstrapped samples
for i in range(size):
# Create a bootstrap sample
np.random.seed(i)
sample_t = trt_dat.sample(nSize, replace=True)
np.random.seed(i + 1)
sample_c = ctrl_dat.sample(nSize, replace=True)
tmp = pd.concat([sample_t,sample_c],ignore_index=True)
# fit model
model = smf.ols('wtchg ~ treat', data=tmp).fit()
#print(model.params)
#print(model.pvalues.loc['D'])
est[i] = model.params['treat']
#print(tmp.y1.mean())
seeds[i] = i
return est, seeds
results = run_tests(trt,ctrl,500)
report = pd.DataFrame(columns=['seed','est'])
report['seed'] = results[1]
report['est'] = results[0]
print(report)
# flag our errors
report['sign'] = np.where(report['est'] > 0,1,0)
report['magnitude'] = np.where(abs(report['est']) >= 4,1,0)
report.head()
report.describe() # ~ 20% of the time we get the wrong sign or exagerrated magnitude
# subset the error cases
tmp = report[report.sign == 1]
tmp= tmp.sort_values('est', ascending=False)
tmp.head(10)
tmp = report[report.magnitude== 1]
tmp= tmp.sort_values('est', ascending=False)
tmp.head(10)
# simualte a random sample with an exaggerated magnitude or wrong sign
nsize = 10
np.random.seed(446)
sample_t = trt.sample(nsize, replace=True)
np.random.seed(447)
sample_c = ctrl.sample(nsize, replace=True)
# stack data
tmp = pd.concat([sample_t,sample_c],ignore_index=True)
# test impact of treatment
results = smf.ols('wtchg ~ treat', data=tmp).fit()
results.summary2()
# what decision would you make if you had gotten a sample and result like this?
#------------------------------------------
# what if there are no differences
#------------------------------------------
trt = pd.DataFrame(columns=['wtchg','treat'])
np.random.seed(123)
trt['wtchg'] = np.random.normal(-2,10,10000)
trt['treat'] = 1
ctrl = pd.DataFrame(columns=['wtchg','treat'])
np.random.seed(967)
ctrl['wtchg'] = np.random.normal(-2,10,10000)
ctrl['treat'] = 0
df = pd.concat([trt,ctrl],ignore_index=True)
df.describe()
df.groupby('treat')['wtchg'].mean()
# visualize
bins = np.linspace(-30, 30, 100)
pyplot.hist(df.wtchg[df.treat == 1], bins, alpha=0.5, label='treatment')
pyplot.hist(df.wtchg[df.treat == 0], bins, alpha=0.5, label='control')
pyplot.legend(loc='upper right')
#--------------------------------------
# power analysis
#-------------------------------------
diff = 5 # this is the size of the impact we want to measure
sigmaA = 10 # standard deviation for group A
sigmaB = 10# standard deviation for group B (for now assume common variance between groups)
Q0 = .5 # proportion in the control group (.5 implies equal sized groups)
sigma_pooled = np.sqrt((np.square(sigmaA)*(1/Q0 -1 ) + np.square(sigmaB)*(1/(1-Q0) -1 ) )/((1/Q0 -1 ) + (1/(1-Q0) -1 ))) # pooled standard deviation
#
d = abs(diff)/sigma_pooled # calculate effect size
print(sigma_pooled)
print(diff,d)
# calcualte required sample size per group
statsmodels.stats.power.tt_ind_solve_power(effect_size= d, nobs1=None, alpha=.05, power= .80, ratio=1.0, alternative='two-sided')
nSize = 64
#np.random.seed(123)
sample_t = trt.sample(nSize, replace=True)
#np.random.seed(123)
sample_c = ctrl.sample(nSize, replace=True)
# stack data
tmp = pd.concat([sample_t,sample_c],ignore_index=True)
# test impact of treatment
# regression using smf
results = smf.ols('wtchg ~ treat', data=tmp).fit()
results.summary2()
nSize = 20
#np.random.seed(123)
sample_t = trt.sample(nSize, replace=True)
#np.random.seed(123)
sample_c = ctrl.sample(nSize, replace=True)
# stack data
tmp = pd.concat([sample_t,sample_c],ignore_index=True)
# test impact of treatment
# regression using smf
results = smf.ols('wtchg ~ treat', data=tmp).fit()
results.summary2()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment