Skip to content

Instantly share code, notes, and snippets.

@mwaskom
Created March 22, 2016 04:22
Show Gist options
  • Save mwaskom/a9b4ce98bea81ca28128 to your computer and use it in GitHub Desktop.
Save mwaskom/a9b4ce98bea81ca28128 to your computer and use it in GitHub Desktop.
Context-dependent proportional-rate diffusion model
from __future__ import division
import numpy as np
import pandas as pd
from scipy import stats
from scipy.optimize import minimize
def ddm_pc(x, k, A, lam):
"""Bernouli likelihood given diffusion model parameters."""
return lam + (1 - 2 * lam) * (1 / (1 + np.exp(-2 * A * k * x)))
def ddm_rt(x, k, A, T0):
"""Likelihood of reaction time data given diffusion model parameters."""
return (A / (k * x)) * np.tanh(A * k * x) + T0
def ddm_rt_var(x, k, A, T0):
"""Variance of RT distributions given diffusion model parameters."""
mu = x * k
return ((A * (np.tanh(A * mu) - A * mu / np.cosh(A * mu) ** 2))
/ mu ** 3) + .1 * T0
def lognormpdf(x, mu, sigma):
"""Log likelihood of normal distribution."""
d = np.sqrt(2 * np.pi)
return np.log(1 / (d * sigma)) - (x - mu) ** 2 / (2 * sigma ** 2)
class PalmerDDM(object):
"""Context-dependent proportional-rate drift diffusion model.
The original reference is:
Palmer J, Huk AC, Shadlen MN, (2005). The effect of stimulus strength
on the speed and accuracy of a perceptual decision. Journal of Vision
5, 376–404.
This class allows for diffusion model parameters to vary across subsets
of the full dataset.
"""
def __init__(self, df, by_params=None, fit_lapse=True):
self.df = df
self.fit_lapse = fit_lapse
if by_params is None:
self.by_params = []
else:
if set(by_params) - {"k", "A", "T0", "lam"}:
raise ValueError("Bad split parameter specification")
self.by_params = by_params
@property
def param_names(self):
"""Parameter names, possibily accounting for subsets."""
base_names = ["k", "A", "T0"]
if self.fit_lapse:
base_names.append("lam")
names = []
for p in base_names:
if p in self.by_params:
for k in self.by_keys:
names.append(p + "_" + k)
else:
names.append(p)
return names
@property
def init_params(self):
"""Initial guess for model paramters."""
params = []
for name in self.param_names:
if name.startswith("k"):
v = 12
elif name.startswith("A"):
v = .7
elif name.startswith("T0"):
v = .5
elif name.startswith("lam"):
v = .025
params.append(v)
return params
def err_func(self, params, df):
"""Diffusion model log-likelihood function for optimization."""
assert len(params) == len(self.param_names)
params = dict(zip(self.param_names, params))
LL_pc = 0
LL_rt = 0
for i, (key, df_i) in enumerate(df.groupby("by")):
# Assign parameters for these values
k = params.get("k_" + key, params.get("k"))
A = params.get("A_" + key, params.get("A"))
T0 = params.get("T0_" + key, params.get("T0"))
lam = params.get("lam_" + key, params.get("lam", 0))
# Compute likelihood of choice accuracy
expected_pc = ddm_pc(df_i.x, k, A, lam)
pc_like = np.r_[expected_pc[df_i.pc],
np.maximum(1 - expected_pc[~df_i.pc], .001)]
LL_pc += np.log(pc_like).sum()
# Compute likelihood of reaction times
rt_bins = df_i.groupby("x").rt.agg([np.mean, np.size])
x_vals = rt_bins.index.values
expected_rt = ddm_rt(x_vals, k, A, T0)
expected_rt_var = ddm_rt_var(x_vals, k, A, T0)
expected_rt_sem = np.sqrt(expected_rt_var / rt_bins["size"])
rt_like = df_i.x.map(pd.Series(lognormpdf(rt_bins["mean"],
expected_rt,
expected_rt_sem),
index=rt_bins.index))
LL_rt += rt_like.sum()
return -(LL_pc + LL_rt)
def fit(self, x="context_strength", rt="rt", pc="correct", by=None,
method="nelder-mead"):
"""Fit the model."""
if by is None and self.by_params:
raise ValueError("Splitting params with no variable to split by")
# Create a new generic DataFrame for fitting
cols = [x, rt, pc]
if by is not None:
cols.append(by)
fit_df = self.df[cols].copy()
if by is None:
fit_df["by"] = "_"
fit_df.columns = ["x", "rt", "pc", "by"]
self.by_keys = list(fit_df["by"].unique())
self._fit_df = fit_df
# Fit the model
result = minimize(self.err_func, self.init_params, fit_df,
method=method)
# Assign the results to object attributes
self.result_ = result
self.ll_ = -result["fun"]
self.params_ = result["x"]
self.success_ = result["success"]
for name, val in zip(self.param_names, self.params_):
setattr(self, name + "_", val)
return self
def predict_pc(self, x, key=None):
"""Return model predictions given fitted values."""
params = dict(zip(self.param_names, self.params_))
k = params.get("k_" + str(key), params.get("k"))
A = params.get("A_" + str(key), params.get("A"))
if self.fit_lapse:
lam = params.get("lam_" + str(key), params.get("lam"))
else:
lam = 0
return ddm_pc(x, k, A, lam)
def predict_rt(self, x, key=None):
"""Return model predictions given fitted values."""
params = dict(zip(self.param_names, self.params_))
k = params.get("k_" + str(key), params.get("k"))
A = params.get("A_" + str(key), params.get("A"))
T0 = params.get("T0_" + str(key), params.get("T0"))
return ddm_rt(x, k, A, T0)
def lr_test(m_A, m_0):
"""Perform a likelihood ratio test between model objects."""
chisqr = 2 * (m_A.ll_ - m_0.ll_)
dof = len(m_A.params_) - len(m_0.params_)
p = stats.chi2(dof).sf(chisqr)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment