Skip to content

Instantly share code, notes, and snippets.

@OlafHaag
Last active October 1, 2020 01:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save OlafHaag/2e36125a3d34571284705d624e81b64f to your computer and use it in GitHub Desktop.
Save OlafHaag/2e36125a3d34571284705d624e81b64f to your computer and use it in GitHub Desktop.
Try to figure out how to use PyMC3 with Patsy for a Hierarchical Linear Model
"""
Having a 3 x (3) Mixed Design: 3 conditions as BETWEEN factor, 3 blocks as WITHIN factor.
There are multiple measurements in each block (trials).
There are 2 dependent variables: parallel & orthogonal
Since 'parallel' and 'orthogonal' measures have the same unit,
it can be viewed as 1 outcome called 'projection' in 2 different directions.
The interest here is in the VARIABILITY (e.g. standard deviation or variance) of the data
for parallel & orthogonal outcomes for blocks 1, 2, and 3.
The analysis should enable to make statements about whether conditions, blocks, direction
have an effect on the outcome.
Bonus:
Each user has an individual level of experience in the task.
Users rate the difficulty of each block they performed.
"""
# %%
import arviz as az
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import patsy
import pymc3 as pm
import seaborn as sns
import theano.tensor as tt
#################################
# Dummy Data #
#################################
# %%
n_trials = 30 # by block.
n_users = 30
n_obs = 3 * n_trials * n_users
df = pd.DataFrame({'user': np.random.randint(n_users, size=n_obs),
'block': 1 + np.random.randint(3, size=n_obs),
'parallel': np.random.normal(0.0, 1, size=n_obs),
'orthogonal': np.random.normal(0.0, 1, size=n_obs)})
df.sort_values(by=['user', 'block'], inplace=True)
df.reset_index(drop=True, inplace=True)
# Assign each user to a condition.
df['condition'] = df['user'].map(pd.Series(index=df['user'].unique(), data=np.random.choice(['A', 'B', 'C'], size=df['user'].nunique())))
# Experience for each user.
df['experience'] = df['user'].map(pd.Series(index=df['user'].unique(), data=np.random.choice([np.nan, 0, 1, 2, 3, 4], size=df['user'].nunique())))
# Give each block its own ID.
df['block_id'] = pd.factorize(df['user'].astype(str) + df['block'].astype(str))[0]
# Trials.
df['trial'] = df.groupby(['block_id'])['block'].transform(lambda x: np.arange(len(x)))
df = df[df['trial'] < n_trials]
# Categorical columns.
df[['user', 'condition', 'block_id', 'block']] = df[['user', 'condition', 'block_id', 'block']].astype('category')
# Give each block_id a rating.
df['rating'] = df['block_id'].map(pd.Series(index=df['block_id'].unique(), data=np.random.randint(5, size=df['block_id'].cat.categories.size)))
df[['parallel', 'orthogonal']] = df[['parallel', 'orthogonal']].astype(theano.config.floatX)
# Thin out rows to simulate more missing data.
mask = np.random.choice([*[True]*4, False], size=df.shape[0])
df = df[mask]
n_obs = len(df)
# Make sure mean of each block is zero as with real data.
df[['parallel', 'orthogonal']] = df.groupby(['user', 'block'])[['parallel', 'orthogonal']].transform(lambda x: x - x.mean())
# Scale block deviations.
scale_para = (9-3) * np.random.random_sample(df['block_id'].nunique()) + 3
scale_ortho = (7-1.5) * np.random.random_sample(df['block_id'].nunique()) + 1.5
scale = np.stack((scale_para, scale_ortho)).T
df[['parallel', 'orthogonal']] = df.groupby(['block_id'])['parallel', 'orthogonal'].apply(lambda x: x * scale[x.name])
# %%
# Reshape data into long format.
samples = df.melt(id_vars=['user', 'experience', 'condition', 'block_id', 'block', 'rating', 'trial'],
value_vars=['parallel', 'orthogonal'],
var_name='direction', value_name='projection')
samples['direction'] = samples['direction'].astype('category')
# %%
# I'm interested in the variance for each direction. The mean of the squared projection by block_id is that block's variance.
samples['log_projection_sq'] = samples['projection'].transform('square').transform('log')
# %%
# Vizualize the data.
g = sns.FacetGrid(samples, col='block', row='user', hue="direction")
g.despine(offset=10)
g = (g.map(sns.distplot, "log_projection_sq").add_legend(title="Projection", loc='upper right'))
for ax in g.axes.flat:
ax.set_ylabel("Probability Density")
ax.set_xlabel("log(Projection Length^2)")
plt.tight_layout()
plt.show()
# %%
#################################
# Coordinates #
#################################
conditions = samples['condition'].cat.categories.tolist()
n_conditions = samples['condition'].nunique()
condition_indices = df['condition'].cat.codes.values # Condition index for every single value.
condition_lookup = pd.Series(*pd.factorize(samples['condition'].cat.categories), name='condition_idx_lookup')
# We need to be able to index by user. But the user IDs of the sample don't start at 0. We create an index and a mapping between them.
users = samples['user'].cat.categories.tolist()
n_users = samples['user'].nunique()
user_indices = samples['user'].cat.codes.values # User index for every single value.
user_lookup = pd.Series(*pd.factorize(samples['user'].cat.categories), name='user_idx_lookup')
# User level predictors. More options: age group, gender.
# gaming_exp is not on an interval scale (no equidistance between items), but ordinal.
experience = samples.drop_duplicates(subset="user")['experience'] # Shape = user_indices
# Block indices for 1,2,3.
blocks = samples['block'].cat.categories.tolist()
n_blocks = samples['block'].nunique() # Same as using cat.categories.size
block_indices = samples['block'].cat.codes.values # Block index for every single value.
block_lookup = pd.Series(*pd.factorize(samples['block'].cat.categories), name='block_idx_lookup')
# Block ID indices. Shape = n_users * n_blocks.
block_ids = samples['block_id'].cat.categories.tolist()
n_block_ids = samples['block_id'].nunique()
block_id_indices = samples['block_id'].cat.codes.values # Block index for every single value.
block_id_lookup = pd.Series(*pd.factorize(samples['block_id'].cat.categories), name='block_id_idx_lookup')
# Block level predictors.
# Note: Ratings are correlated with block index.
ratings = samples.drop_duplicates(subset=["user", 'block'])['rating'] # Shape = block_id_indices
# Coded direction of projection: 0 = orthogonal, 1 = parallel.
directions = samples['direction'].cat.categories.tolist()
n_directions = samples['direction'].nunique()
direction_indices = samples['direction'].cat.codes.values
direction_lookup = pd.Series(*pd.factorize(samples['direction'].cat.categories), name='direction_idx_lookup')
coords = {'Direction': directions, 'Condition': conditions, 'User' :users, 'Block': blocks, 'Block_ID': block_ids, 'obs_id': np.arange(direction_indices
.size)}
#################################
# Multi-Level Model #
#################################
# %%
# Fixed effects. Commonly denoted as X.
X = patsy.dmatrix('1 + block * direction', samples, return_type='dataframe')
X = np.asarray(X)
# %%
# Design matrix for the random effects. Intercept and slope are modelled separately to have more control on the prior.
Z_intercept = patsy.dmatrix('0 + user', data=samples, return_type='dataframe')
Z_intercept = np.asarray(Z_intercept)
Z_slope = patsy.dmatrix('0 + user:direction', data=samples, return_type='dataframe')
Z_slope = np.asarray(Z_slope)
Z = np.concatenate((Z_intercept, Z_slope), axis=1)
print('Z_intercept has shape ({}, {})'.format(*Z_intercept.shape))
print('Z_slope has shape ({}, {})'.format(*Z_slope.shape))
print('Z has shape ({}, {})'.format(*Z.shape))
# %%
Y = np.asarray(samples['log_projection_sq'])
# %%
with pm.Model(coords=coords) as hlm_model:
## Fixed effect
beta_X_intercept = pm.Normal('beta_X_ortho', mu=0, sd=10) # contrain it to positive values
beta_X_slope_b2 = pm.Normal('beta_X_b2_ortho_offset', mu=0, sd=10)
beta_X_slope_b3 = pm.Normal('beta_X_b3_ortho_offset', mu=0, sd=10)
beta_X_slope_para = pm.Normal('beta_X_parallel_offset', mu=0, sd=10)
beta_X_slope_b2_para = pm.Normal('beta_X_b2_parallel_offset', mu=0, sd=10)
beta_X_slope_b3_para = pm.Normal('beta_X_b3_parallel_offset', mu=0, sd=10)
beta_X = tt.stack([beta_X_intercept,
beta_X_slope_b2,
beta_X_slope_b3,
beta_X_slope_para,
beta_X_slope_b2_para,
beta_X_slope_b3_para
])
estimate_X = pm.math.dot(X, beta_X)
## Random effect
# Non Centered version
sigma_Z_intercept = pm.HalfNormal('sigma_Z_intercept', sd=10)
gamma_Z_intercept_raw = pm.Normal('gamma_Z_intercept_raw', mu=0, sd=1, shape=Z_intercept.shape[1])
gamma_Z_intercept = pm.Deterministic('gamma_Z_intercept', gamma_Z_intercept_raw * sigma_Z_intercept)
sigma_Z_slope = pm.HalfNormal('sigma_Z_slope', sd=10)
gamma_Z_slope_raw = pm.Normal('gamma_Z_slope_raw', mu=0, sd=1, shape=Z_slope.shape[1])
gamma_Z_slope = pm.Deterministic('gamma_Z_slope', gamma_Z_slope_raw * sigma_Z_slope)
estimate_Z = pm.math.dot(Z_intercept, gamma_Z_intercept) + pm.math.dot(Z_slope, gamma_Z_slope)
## likelihood
mu_estimate = pm.Deterministic('mu_estimate', estimate_X + estimate_Z)
sigma_unexplained = pm.HalfNormal('sigma_unexplained', sd=10) # unexplained variability
y_likelihood = pm.Normal('y_likelihood', mu=mu_estimate, sd=sigma_unexplained, observed=Y)
# %%
pm.model_to_graphviz(hlm_model)
# %%
with hlm_model:
idata = pm.sample(2000, tune=2000, return_inferencedata=True)
# %%
az.summary(idata)
# %%
pm.traceplot(idata, varnames=['beta_X_ortho', 'beta_X_b2_ortho_offset', 'beta_X_b3_ortho_offset', 'beta_X_parallel_offset', 'beta_X_b2_parallel_offset', 'beta_X_b3_parallel_offset', 'gamma_Z_intercept', 'gamma_Z_slope', 'sigma_unexplained'])
# %%
pm.plot_posterior(idata, varnames=['beta_X_ortho', 'beta_X_b2_ortho_offset', 'beta_X_b3_ortho_offset', 'beta_X_parallel_offset', 'beta_X_b2_parallel_offset', 'beta_X_b3_parallel_offset', 'gamma_Z_intercept', 'gamma_Z_slope', 'sigma_unexplained'])
"""
Having a 3 x (3) Mixed Design: 3 conditions as BETWEEN factor, 3 blocks as WITHIN factor.
There are multiple measurements in each block (trials).
There are 2 dependent variables: parallel & orthogonal
Since 'parallel' and 'orthogonal' measures have the same unit,
it can be viewed as 1 outcome called 'projection' in 2 different directions.
The interest here is in the VARIABILITY (e.g. standard deviation or variance) of the data
for parallel & orthogonal outcomes for blocks 1, 2, and 3.
The analysis should enable to make statements about whether conditions, blocks, direction
have an effect on the outcome.
Bonus:
Each user has an individual level of experience in the task.
Users rate the difficulty of each block they performed.
"""
# %%
### Imports ###
import arviz as az
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import patsy
import pymc3 as pm
import seaborn as sns
import theano
import theano.tensor as tt
RANDOM_SEED = 2048
np.random.seed(386)
az.style.use('arviz-darkgrid')
# %%
### Dummy Data ###
##################
n_trials = 30 # by block.
n_users = 30
n_obs = 3 * n_trials * n_users
df = pd.DataFrame({'user': np.random.randint(n_users, size=n_obs),
'block': 1 + np.random.randint(3, size=n_obs),
'parallel': np.random.normal(0.0, 1, size=n_obs),
'orthogonal': np.random.normal(0.0, 1, size=n_obs)})
df.sort_values(by=['user', 'block'], inplace=True)
df.reset_index(drop=True, inplace=True)
# Assign each user to a condition.
df['condition'] = df['user'].map(pd.Series(index=df['user'].unique(), data=np.random.choice(['A', 'B', 'C'], size=df['user'].nunique())))
# Experience for each user.
df['experience'] = df['user'].map(pd.Series(index=df['user'].unique(), data=np.random.choice([np.nan, 0, 1, 2, 3, 4], size=df['user'].nunique())))
# Give each block its own ID.
df['block_id'] = pd.factorize(df['user'].astype(str) + df['block'].astype(str))[0]
# Trials.
df['trial'] = df.groupby(['block_id'])['block'].transform(lambda x: np.arange(len(x)))
df = df[df['trial'] < n_trials]
# Categorical columns.
df[['user', 'condition', 'block_id', 'block']] = df[['user', 'condition', 'block_id', 'block']].astype('category')
# Give each block_id a rating.
df['rating'] = df['block_id'].map(pd.Series(index=df['block_id'].unique(), data=np.random.randint(5, size=df['block_id'].cat.categories.size)))
df[['parallel', 'orthogonal']] = df[['parallel', 'orthogonal']].astype(theano.config.floatX)
# Thin out rows to simulate more missing data.
mask = np.random.choice([*[True]*4, False], size=df.shape[0])
df = df[mask]
n_obs = len(df)
# Make sure mean of each block is zero as with real data.
df[['parallel', 'orthogonal']] = df.groupby(['user', 'block'])[['parallel', 'orthogonal']].transform(lambda x: x - x.mean())
# Scale block deviations.
scale_para = (9-3) * np.random.random_sample(df['block_id'].nunique()) + 3
scale_ortho = (7-1.5) * np.random.random_sample(df['block_id'].nunique()) + 1.5
scale = np.stack((scale_para, scale_ortho)).T
df[['parallel', 'orthogonal']] = df.groupby(['block_id'])['parallel', 'orthogonal'].apply(lambda x: x * scale[x.name])
# %%
### Coordinates ###
###################
conditions = df['condition'].cat.categories.tolist()
n_conditions = df['condition'].nunique()
condition_indices = df['condition'].cat.codes.values # Condition index for every single value.
condition_lookup = pd.Series(*pd.factorize(df['condition'].cat.categories), name='condition_idx_lookup')
# We need to be able to index by user. But the user IDs of the sample don't start at 0. We create an index and a mapping between them.
users = df['user'].cat.categories.tolist()
n_users = df['user'].nunique()
user_indices = df['user'].cat.codes.values # User index for every single value.
user_lookup = pd.Series(*pd.factorize(df['user'].cat.categories), name='user_idx_lookup')
# User level predictors. More options: age group, gender.
# gaming_exp is not on an interval scale (no equidistance between items), but ordinal.
experience = df.drop_duplicates(subset="user")['experience'] # Shape = user_indices
# Block indices for 1,2,3.
blocks = df['block'].cat.categories.tolist()
n_blocks = df['block'].nunique() # Same as using cat.categories.size
block_indices = df['block'].cat.codes.values # Block index for every single value.
block_lookup = pd.Series(*pd.factorize(df['block'].cat.categories), name='block_idx_lookup')
# Block ID indices. Shape = n_users * n_blocks.
block_ids = df['block_id'].cat.categories.tolist()
n_block_ids = df['block_id'].nunique()
block_id_indices = df['block_id'].cat.codes.values # Block index for every single value.
block_id_lookup = pd.Series(*pd.factorize(df['block_id'].cat.categories), name='block_id_idx_lookup')
# Block level predictors.
# Note: Ratings are correlated with block index.
ratings = df.drop_duplicates(subset=["user", 'block'])['rating'] # Shape = block_id_indices
# Direction of projection.
directions = ['orthogonal', 'parallel']
n_directions = 2
coords = {'Direction': directions,
'Condition': conditions,
'User' :users,
'Block': blocks,
'Block_ID': block_ids,
'obs_id': np.arange(len(df))}
# %%
### Design Matrices ###
#######################
# For each block create an on/off [0, 1] switch for all trials.
block_mx = patsy.dmatrix('0 + block', df, return_type='dataframe').astype(int)
block_mx.columns = blocks
# %%
### Helper functions ###
def plot_ppc_loopit(idata, title):
fig = plt.figure(figsize=(12,9))
ax_ppc = fig.add_subplot(211)
ax1 = fig.add_subplot(223); ax2 = fig.add_subplot(224)
az.plot_ppc(idata, ax=ax_ppc)
for ax, ecdf in zip([ax1, ax2], (False, True)):
az.plot_loo_pit(idata, y="projection", ecdf=ecdf, ax=ax)
ax_ppc.set_title(title)
ax_ppc.set_xlabel("")
return np.array([ax_ppc, ax1, ax2])
def print_divergencies(trace):
# Display the total number and percentage of divergencies.
divergent = trace['diverging']
print(f"Number of Divergencies: {divergent.nonzero()[0].size}")
divperc = divergent.nonzero()[0].size / len(trace) * 100
print(f"Percentage of Divergencies: {divperc:.1f}")
# %%
### Null Model ###
##################
with pm.Model(coords=coords) as null_model:
# Wrapping the index in a Data object keeps it in the model
# and stores the variables in the constant_data group of an InferenceData object.
user_idx = pm.Data('user_idx', user_indices, dims='obs_id')
# Hyperpriors:
global_sigma = pm.Gamma('global_sigma', mu=5, sigma=5)
global_sigma_sd = pm.HalfCauchy('global_sigma_sd', 4.0)
# Varying user variability. How many priors of this kind we want is inferred from dims.
user_sd = pm.Gamma('user_sd', mu=global_sigma, sigma=global_sigma_sd, dims='User')
# Our coordinates aren't in long format, so we need to have the same prior 2 times to cover both directions.
theta = tt.stack((user_sd, user_sd)).T
# Using user_idx to index theta somehow tells which prior belongs to which user.
projection = pm.Normal('projection', mu=0, sigma=theta[user_idx], observed=df[directions], dims=('obs_id', 'Direction'))
# %%
# Show model graph.
pm.model_to_graphviz(null_model)
# %%
# Prior predictive check.
with null_model:
null_prior = pm.sample_prior_predictive(700)
null_idata = az.from_pymc3(prior=null_prior)
az.plot_ppc(null_idata, group="prior")
# %%
# Sample from posterior.
with null_model:
null_model.name = "Null Model"
null_trace = pm.sample(1000, tune=800)
# The dimensions and coordinates are lost during sampling.
# pm.MultiTrace objects do not store the labeled coordinates of their variables.
# But we can infer the coordinates from the model.
idata_aux = az.from_pymc3(null_trace, model=null_model)
# Add the posterior to our prior information.
null_idata.extend(idata_aux)
az.plot_posterior(null_idata, var_names=['global'], filter_vars='like')
# %%
az.plot_pair(null_idata, var_names=['global'], filter_vars='like')
# %%
az.summary(null_idata)
# %%
az.plot_trace(null_idata, compact=True, chain_prop={"ls": "-"})
# %%
# Posterior predictive check.
with null_model:
null_post_pred = pm.sample_posterior_predictive(null_idata)
idata_aux = az.from_pymc3(posterior_predictive=null_post_pred)
null_idata.extend(idata_aux)
# %%
az.loo(null_idata)
# %%
plot_ppc_loopit(null_idata, null_model.name)
# %%
# Main Effect Projection Direction Model (undirected) #
#######################################################
with pm.Model(coords=coords) as dir_model:
user_idx = pm.Data('user_idx', user_indices, dims='obs_id')
# Hyperpriors:
global_sigma = pm.Gamma('global_sigma', mu=5, sigma=5, dims='Direction')
global_sigma_sd = pm.HalfCauchy('global_sigma_sd', 4.0, dims='Direction')
# Varying user variability.
# Centered paramterization.
user_sd = pm.Gamma('user_sd', mu=global_sigma, sigma=global_sigma_sd, dims=('User', 'Direction'))
# Non-centered reparameterization.
#user_sd_offset = pm.Exponential('user_sd_offset', 1.0, dims=('User', 'Direction'))
#user_sd = pm.Deterministic('user_sd', global_sigma + user_sd_offset * global_sigma_sd, dims=('User', 'Direction'))
projection = pm.Normal('projection', mu=0, sigma=user_sd[user_idx], observed=df[directions], dims=('obs_id', 'Direction'))
# %%
# Show model graph.
pm.model_to_graphviz(dir_model)
# %%
# Prior predictive check.
with dir_model:
dir_prior = pm.sample_prior_predictive(700)
dir_idata = az.from_pymc3(prior=dir_prior)
#az.plot_ppc(dir_idata, group="prior")
# %%
# Sample from posterior.
with dir_model:
dir_model.name = "Main Effect Direction Model"
dir_trace = pm.sample(1000, tune=1200)#, target_accept=0.95)
# The dimensions and coordinates are lost during sampling.
# pm.MultiTrace objects do not store the labeled coordinates of their variables.
# But we can infer the coordinates from the model.
idata_aux = az.from_pymc3(dir_trace, model=dir_model)
# Add the posterior to our prior information.
dir_idata.extend(idata_aux)
print_divergencies(dir_trace)
# %%
az.plot_posterior(dir_idata, var_names=['global_sigma'])
# %%
az.plot_pair(dir_idata, var_names=['global'], filter_vars='like', divergences=True)
# %%
az.summary(dir_idata)
# %%
az.plot_trace(dir_idata, compact=True, chain_prop={"ls": "-"})
# %%
# Posterior predictive check.
with dir_model:
dir_post_pred = pm.sample_posterior_predictive(dir_idata)
idata_aux = az.from_pymc3(posterior_predictive=dir_post_pred)
dir_idata.extend(idata_aux)
# %%
az.loo(dir_idata)
# %%
plot_ppc_loopit(dir_idata, dir_model.name)
# %%
# Main Effect Projection Direction Model (directed) #
#####################################################
with pm.Model(coords=coords) as dir_model_r:
user_idx = pm.Data('user_idx', user_indices, dims='obs_id')
# Hyperpriors.
global_sigma_ortho = pm.Gamma("global_sigma_ortho", mu=3, sigma=5)
global_sigma_ortho_sd = pm.Exponential('global_sigma_ortho_sd', 0.5)
# Difference between orthohonal and parallel projections.
global_sigma_diff = pm.Gamma('sigma_diff', mu=3, sigma=5)
global_sigma_diff_sd = pm.Exponential('global_diff_sd', 1.0)
# The sum of two gamma-distributed variables is gamma-distributed.
global_sigma_parallel = pm.Deterministic("global_sigma_parallel", global_sigma_ortho + global_sigma_diff)
# Priors.
user_ortho = pm.Gamma('user_ortho', mu=global_sigma_ortho, sigma=global_sigma_ortho_sd, dims='User')
# Varying slopes:
user_diff = pm.Gamma('user_diff', mu=global_sigma_diff, sigma=global_sigma_diff_sd, dims='User')
user_parallel = pm.Deterministic('user_parallel', user_ortho + user_diff, dims='User')
# Expected deviation per user and direction:
theta = tt.stack((user_ortho, user_parallel)).T
# Observed.
projection = pm.Normal('projection', 0.0, sigma=theta[user_idx], observed=df[directions], dims=('obs_id', 'Direction'))
# %%
pm.model_to_graphviz(dir_model_r)
# %%
# Prior predictive check.
with dir_model:
dir_prior_r = pm.sample_prior_predictive(700)
dir_idata_r = az.from_pymc3(prior=dir_prior_r)
# %%
#az.plot_ppc(dir_idata, group="prior")
_, ax = plt.subplots()
dir_idata_r.prior.plot.scatter(x='global_sigma_ortho', y='global_sigma_parallel', color='k', alpha=0.2, ax=ax)
# %%
# Sample from posterior.
with dir_model_r:
dir_model_r.name = "Main Effect Direction Model"
dir_trace_r = pm.sample(1000, tune=1200)#, target_accept=0.95)
# The dimensions and coordinates are lost during sampling.
# pm.MultiTrace objects do not store the labeled coordinates of their variables.
# But we can infer the coordinates from the model.
idata_aux = az.from_pymc3(dir_trace_r, model=dir_model_r)
# Add the posterior to our prior information.
dir_idata_r.extend(idata_aux)
print_divergencies(dir_trace_r)
# %%
az.plot_posterior(dir_idata_r, var_names=['global_sigma'])
# %%
az.plot_pair(dir_idata_r, var_names=['global'], filter_vars='like', divergences=True)
# %%
az.summary(dir_idata_r)
# %%
az.plot_trace(dir_idata_r, compact=True, chain_prop={"ls": "-"})
# %%
# Posterior predictive check.
with dir_model_r:
dir_post_pred_r = pm.sample_posterior_predictive(dir_idata_r)
idata_aux = az.from_pymc3(posterior_predictive=dir_post_pred_r)
dir_idata_r.extend(idata_aux)
# %%
az.loo(dir_idata_r)
# %%
plot_ppc_loopit(dir_idata_r, dir_model_r.name)
# %%
# Main Effect Block Model (undirected) #
########################################
with pm.Model(coords=coords) as block_model:
user_idx = pm.Data('user_idx', user_indices, dims='obs_id')
block_idx = pm.Data('block_idx', block_indices, dims='obs_id')
# Hyperpriors.
global_sigma = pm.Gamma("global_sigma", mu=5, sigma=5, dims='Block')
global_sigma_sd = pm.HalfCauchy('global_sigma_sd', 4.0, dims='Block')
# Priors.
user_sd = pm.Gamma('user_sd', mu=global_sigma, sigma=global_sigma_sd, dims=('User', 'Block'))
# Expected deviation per user and direction:
theta = tt.stack((user_sd[user_idx, block_idx], user_sd[user_idx, block_idx])).T
# Observed.
projection = pm.Normal('projection', 0.0, sigma=theta, observed=df[directions], dims=('obs_id', 'Direction'))
# %%
pm.model_to_graphviz(block_model)
# %%
# Prior predictive check.
with block_model:
block_prior = pm.sample_prior_predictive(700)
block_idata = az.from_pymc3(prior=block_prior)
#az.plot_ppc(block_idata, group="prior")
# %%
# Sample from posterior.
with block_model:
block_model.name = "Main Effect Block Model"
block_trace = pm.sample(1000, tune=1200, target_accept=0.95)
# The dimensions and coordinates are lost during sampling.
# pm.MultiTrace objects do not store the labeled coordinates of their variables.
# But we can infer the coordinates from the model.
idata_aux = az.from_pymc3(block_trace, model=block_model)
# Add the posterior to our prior information.
block_idata.extend(idata_aux)
print_divergencies(block_trace)
# %%
az.plot_posterior(block_idata, var_names=['global_sigma'])
# %%
az.plot_pair(block_idata, var_names=['global'], filter_vars='like', divergences=True)
# %%
az.summary(block_idata)
# %%
az.plot_trace(block_idata, compact=True, chain_prop={"ls": "-"})
# %%
# Posterior predictive check.
with block_model:
block_post_pred = pm.sample_posterior_predictive(block_idata)
idata_aux = az.from_pymc3(posterior_predictive=block_post_pred)
block_idata.extend(idata_aux)
# %%
az.loo(block_idata)
# %%
plot_ppc_loopit(block_idata, block_model.name)
# %%
# Main Effect Block Model (directed) #
######################################
with pm.Model(coords=coords) as block_model_r:
user_idx = pm.Data('user_idx', user_indices, dims='obs_id')
block2_idx = pm.Data('block2_idx', block_mx[1].values, dims='obs_id')
# Hyperpriors.
global_sigma_b13 = pm.Gamma("global_sigma_blocks13", mu=5, sigma=5)
global_sigma_b13_sd = pm.HalfCauchy('global_sigma_blocks13_sd', 4.0)
global_sigma_diff = pm.Gamma("global_sigma_diff", mu=5, sigma=5)
global_sigma_diff_sd = pm.HalfCauchy('global_sigma_diff_sd', 4.0)
# The sum of two gamma-distributed variables is gamma-distributed.
global_sigma_b2 = pm.Deterministic("global_sigma_block2", global_sigma_b13 + global_sigma_diff)
# Priors.
user_sd_b13 = pm.Gamma('user_sd_blocks13', mu=global_sigma_b13, sigma=global_sigma_b13_sd, dims='User')
user_sd_diff = pm.Gamma('user_sd_diff', mu=global_sigma_diff, sigma=global_sigma_diff_sd, dims='User')
user_sd_b2 = pm.Deterministic("user_sd_block2", user_sd_b13 + user_sd_diff, dims='User')
# Expected deviation per user and direction:
theta_ = (1 - block2_idx) * user_sd_b13[user_idx] + block2_idx * user_sd_b2[user_idx]
theta = tt.stack((theta_, theta_)).T
# Observed.
projection = pm.Normal('projection', 0.0, sigma=theta, observed=df[directions], dims=('obs_id', 'Direction'))
# %%
pm.model_to_graphviz(block_model_r)
# %%
# Prior predictive check.
with block_model_r:
block_prior_r = pm.sample_prior_predictive(700)
block_idata_r = az.from_pymc3(prior=block_prior_r)
#az.plot_ppc(block_idata, group="prior")
# %%
# Sample from posterior.
with block_model_r:
block_model_r.name = "Main Effect Block 2>[1&3] Model"
block_trace_r = pm.sample(1000, tune=1200, target_accept=0.95)
# The dimensions and coordinates are lost during sampling.
# pm.MultiTrace objects do not store the labeled coordinates of their variables.
# But we can infer the coordinates from the model.
idata_aux = az.from_pymc3(block_trace_r, model=block_model_r)
# Add the posterior to our prior information.
block_idata_r.extend(idata_aux)
print_divergencies(block_trace_r)
# %%
az.plot_posterior(block_idata_r, var_names=['global_sigma'])
# %%
az.plot_pair(block_idata_r, var_names=['global'], filter_vars='like', divergences=True)
# %%
az.summary(block_idata_r)
# %%
az.plot_trace(block_idata_r, compact=True, chain_prop={"ls": "-"})
# %%
# Posterior predictive check.
with block_model_r:
block_post_pred_r = pm.sample_posterior_predictive(block_idata_r)
idata_aux = az.from_pymc3(posterior_predictive=block_post_pred_r)
block_idata_r.extend(idata_aux)
# %%
az.loo(block_idata_r)
# %%
plot_ppc_loopit(block_idata_r, block_model_r.name)
# %%
# Interaction Effect Projection Direction x Block Model (undirected) #
######################################################################
with pm.Model(coords=coords) as interaction_model:
user_idx = pm.Data('user_idx', user_indices, dims='obs_id')
block_idx = pm.Data('block_idx', block_indices, dims='obs_id')
# Hyperpriors:
global_sigma = pm.Gamma('global_sigma', mu=5, sigma=5, dims=('Direction', 'Block'))
global_sigma_sd = pm.HalfCauchy('global_sigma_sd', 4.0, dims=('Direction', 'Block'))
# Varying user variability.
# Centered paramterization.
user_sd = pm.Gamma('user_sd', mu=global_sigma, sigma=global_sigma_sd, dims=('User', 'Direction', 'Block'))
# Non-centered reparameterization.
#user_sd_offset = pm.Exponential('user_sd_offset', 1.0, dims=('User', 'Direction'))
#user_sd = pm.Deterministic('user_sd', global_sigma + user_sd_offset * global_sigma_sd, dims=('User', 'Direction'))
# Expected deviation per user and direction:
theta = tt.stack((user_sd[user_idx, 0, block_idx], user_sd[user_idx, 1, block_idx])).T
projection = pm.Normal('projection', mu=0, sigma=theta, observed=df[directions], dims=('obs_id', 'Direction'))
# %%
# Show model graph.
pm.model_to_graphviz(interaction_model)
# %%
# Prior predictive check.
with interaction_model:
interaction_prior = pm.sample_prior_predictive(700)
interaction_idata = az.from_pymc3(prior=interaction_prior)
#az.plot_ppc(interaction_idata, group="prior")
# %%
# Sample from posterior.
with interaction_model:
interaction_model.name = "Interaction Direction x Block Model"
interaction_trace = pm.sample(1000, tune=1200, target_accept=0.99)
# The dimensions and coordinates are lost during sampling.
# pm.MultiTrace objects do not store the labeled coordinates of their variables.
# But we can infer the coordinates from the model.
idata_aux = az.from_pymc3(interaction_trace, model=interaction_model)
# Add the posterior to our prior information.
interaction_idata.extend(idata_aux)
print_divergencies(interaction_trace)
# %%
az.plot_posterior(interaction_idata, var_names=['global_sigma'])
# %%
az.plot_pair(interaction_idata, var_names=['global_sigma'], filter_vars=None, divergences=True)
# %%
az.summary(interaction_idata)
# %%
az.plot_trace(interaction_idata, compact=True, chain_prop={"ls": "-"})
# %%
# Posterior predictive check.
with interaction_model:
interaction_post_pred = pm.sample_posterior_predictive(interaction_idata)
idata_aux = az.from_pymc3(posterior_predictive=interaction_post_pred)
interaction_idata.extend(idata_aux)
# %%
az.loo(interaction_idata)
# %%
plot_ppc_loopit(interaction_idata, interaction_model.name)
# %%
# Hierarchical Model Blocks -> Direction -> User #
##################################################
with pm.Model(coords=coords) as hierarchical_model:
user_idx = pm.Data('user_idx', user_indices, dims='obs_id')
block_idx = pm.Data('block_idx', block_indices, dims='obs_id')
# Hyperpriors:
global_sigma_block = pm.Gamma('global_sigma_block', mu=5, sigma=5, dims='Block')
global_sigma_block_sd = pm.HalfCauchy('global_sigma_block_sd', 4.0, dims='Block')
global_sigma_direction = pm.Gamma('global_sigma_dir', mu=global_sigma_block, sigma=global_sigma_block_sd, dims=('Direction', 'Block'))
global_sigma_direction_sd = pm.HalfCauchy('global_sigma_dir_sd', 4.0, dims=('Direction', 'Block'))
# Varying user variability.
# Centered paramterization.
user_sd = pm.Gamma('user_sd', mu=global_sigma_direction, sigma=global_sigma_direction_sd, dims=('User', 'Direction', 'Block'))
# Expected deviation per user and direction:
theta = tt.stack((user_sd[user_idx, 0, block_idx], user_sd[user_idx, 1, block_idx])).T
projection = pm.Normal('projection', mu=0, sigma=theta, observed=df[directions], dims=('obs_id', 'Direction'))
# %%
# Show model graph.
pm.model_to_graphviz(hierarchical_model)
# %%
# Prior predictive check.
with hierarchical_model:
hierarchical_prior = pm.sample_prior_predictive(700)
hierarchical_idata = az.from_pymc3(prior=hierarchical_prior)
# %%
# Sample from posterior.
with hierarchical_model:
hierarchical_model.name = "Hierarchical Block x Direction Model"
hierarchical_trace = pm.sample(1000, tune=1200, target_accept=0.99)
# The dimensions and coordinates are lost during sampling.
# pm.MultiTrace objects do not store the labeled coordinates of their variables.
# But we can infer the coordinates from the model.
idata_aux = az.from_pymc3(hierarchical_trace, model=hierarchical_model)
# Add the posterior to our prior information.
hierarchical_idata.extend(idata_aux)
print_divergencies(hierarchical_trace)
# %%
az.plot_posterior(hierarchical_idata, var_names=['global_sigma_dir'], filter_vars=None)
# %%
# non-centered Hierarchical Model #
# Blocks -> Direction -> User #
###################################
with pm.Model(coords=coords) as hierarchical_model_nc:
user_idx = pm.Data('user_idx', user_indices, dims='obs_id')
block_idx = pm.Data('block_idx', block_indices, dims='obs_id')
# Hyperpriors:
global_sigma_block = pm.Gamma('global_sigma_block', mu=5, sigma=5, dims='Block')
global_sigma_block_sd = pm.HalfCauchy('global_sigma_block_sd', 4.0, dims='Block')
# A potentially negative offset is problematic if it pushes beyond 0.
# A strictly positive offset is problematic, because it biases the estimate.
sigma_direction_offset = pm.Exponential('sigma_dir_offset', 1.0, dims=('Direction', 'Block'))
sigma_direction = pm.Deterministic('sigma_dir', global_sigma_block + sigma_direction_offset * global_sigma_block_sd, dims=('Direction', 'Block'))
sigma_direction_sd = pm.HalfCauchy('sigma_dir_sd', 4.0, dims=('Direction', 'Block'))
# Varying user variability.
# Non-centered reparameterization.
user_sd_offset = pm.Exponential('user_sd_offset', 1.0, dims=('User', 'Direction', 'Block'))
user_sd = pm.Deterministic('user_sd', sigma_direction + user_sd_offset * sigma_direction_sd, dims=('User', 'Direction', 'Block'))
# Expected deviation per user and direction:
theta = tt.stack((user_sd[user_idx, 0, block_idx], user_sd[user_idx, 1, block_idx])).T
projection = pm.Normal('projection', mu=0, sigma=theta, observed=df[directions], dims=('obs_id', 'Direction'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment