Skip to content

Instantly share code, notes, and snippets.

@dengemann
Last active August 29, 2015 14:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save dengemann/2b3203394eccc471e4cf to your computer and use it in GitHub Desktop.
Save dengemann/2b3203394eccc471e4cf to your computer and use it in GitHub Desktop.
"""
========================================
Regression on continuous data (rER[P/F])
========================================
This demonstrates how rERPs/regressing the continuous data is a
generalisation of traditional averaging. If all preprocessing steps
are the same and if no overlap between epochs exists and if all
predictors are binary, regression is virtually identical to traditional
averaging.
If overlap exists and/or predictors are continuous, traditional averaging
is inapplicable, but regression can estimate, including those of
continuous predictors.
Note. This example is based on new code which may still not be
memory-optimized. Be careful when working with a small computer.
rERPs are described in:
Smith, N. J., & Kutas, M. (2015). Regression-based estimation of ERP
waveforms: II. Non-linear effects, overlap correction, and practical
considerations. Psychophysiology, 52(2), 169-189.
"""
# Authors: Jona Sassenhagen <jona.sassenhagen@gmail.de>
#
# License: BSD (3-clause)
import mne
from mne.datasets import spm_face
from mne.externals.six import string_types
import numpy as np
from scipy import linalg
from mne.evoked import EvokedArray
from mne.utils import _reject_data_segments, _get_fast_dot
from mne.io.pick import pick_types, pick_info
def linear_regression_raw(raw, events, event_id=None, tmin=-.1, tmax=1,
covariates=None, reject=None, flat=None, tstep=1.,
decim=1, picks=None, solver='pinv'):
"""Estimate regression-based evoked potentials/fields by linear modelling
of the full M/EEG time course, including correction for overlapping
potentials and allowing for continuous/scalar predictors. Internally, this
constructs a predictor matrix X of size n_samples * (n_conds * window
length), solving the linear system Y = bX and returning b as evoked-like
time series split by condition.
References
----------
Smith, N. J., & Kutas, M. (2015). Regression-based estimation of ERP
waveforms: II. Non-linear effects, overlap correction, and practical
considerations. Psychophysiology, 52(2), 169-189.
Parameters
----------
raw : instance of Raw
A raw object. Note: be very careful about data that is not
downsampled, as the resulting matrices can be enormous and easily
overload your computer. Typically, 100 Hz sampling rate is
appropriate - or using the decim keyword (see below).
events : ndarray of int, shape (n_events, 3)
An array where the first column corresponds to samples in raw
and the last to integer codes in event_id.
event_id : dict
As in Epochs; a dictionary where the values may be integers or
iterables of integers, corresponding to the 3rd column of
events, and the keys are condition names.
tmin : float | dict
If float, gives the lower limit (in seconds) for the time window for
which all event types' effects are estimated. If a dict, can be used to
specify time windows for specific event types: keys correspond to keys
in event_id and/or covariates; for missing values, the default (-.1) is
used.
tmax : float | dict
If float, gives the upper limit (in seconds) for the time window for
which all event types' effects are estimated. If a dict, can be used to
specify time windows for specific event types: keys correspond to keys
in event_id and/or covariates; for missing values, the default (1.) is
used.
covariates : dict-like | None
If dict-like (e.g., a pandas DataFrame), values have to be array-like
and of the same length as the columns in ```events```. Keys correspond
to additional event types/conditions to be estimated and are matched
with the time points given by the first column of ```events```. If
None, only binary events (from event_id) are used.
reject : None | dict
For cleaning raw data before the regression is performed: set up
rejection parameters based on peak-to-peak amplitude in continuously
selected subepochs. If None, no rejection is done.
If dict, keys are types ('grad' | 'mag' | 'eeg' | 'eog' | 'ecg')
and values are the maximal peak-to-peak values to select rejected
epochs. E.g. reject = dict(grad=4000e-12, # T / m (gradiometers)
mag=4e-11, # T (magnetometers)
eeg=40e-5, # uV (EEG channels)
eog=250e-5 # uV (EOG channels))
flat : None | dict
or cleaning raw data before the regression is performed: set up
rejection parameters based on flatness of the signal. If None, no
rejection is done. If a dict, keys are ('grad' | 'mag' |
'eeg' | 'eog' | 'ecg') and values are minimal peak-to-peak values to
select rejected epochs.
tstep : float
Length of windows for peak-to-peak detection for raw data cleaning.
decim : int
Decimate by choosing only a subsample of data points. Highly
recommended for data recorded at high sampling frequencies, as
otherwise huge intermediate matrices have to be created and inverted.
picks : None | list
List of indices of channels to be included. If None, defaults to all
MEG and EEG channels.
solver : str | function
Either a function which takes as its inputs the predictor matrix X
and the observation matrix Y, and returns the coefficient matrix b;
or a string (for now, only 'pinv'), in which case the solver used
is dot(scipy.linalg.pinv(dot(X.T, X)), dot(X.T, Y.T)).T.
Returns
-------
evokeds : dict
A dict where the keys correspond to conditions and the values are
Evoked objects with the ER[F/P]s. These can be used exactly like any
other Evoked object, including e.g. plotting or statistics.
"""
if isinstance(solver, string_types):
if solver == 'pinv':
fast_dot = _get_fast_dot()
# inv is slightly (~10%) faster, but pinv seemingly more stable
def solver(X, Y):
return fast_dot(linalg.pinv(fast_dot(X.T, X)),
fast_dot(X.T, Y.T)).T
else:
raise ValueError("No such solver: {0}".format(solver))
# prepare raw and events
if picks is None:
picks = pick_types(raw.info, meg=True, eeg=True, ref_meg=True)
info = pick_info(raw.info, picks, copy=True)
info["sfreq"] /= decim
data, times = raw[:]
data = data[picks, ::decim]
times = times[::decim]
events = events.copy()
events[:, 0] -= raw.first_samp
events[:, 0] /= decim
conds = list(event_id)
if covariates is not None:
conds += list(covariates)
with profile.timestamp('handle-events'):
# time windows (per event type) are converted to sample points from times
if isinstance(tmin, (float, int)):
tmin_s = dict((cond, int(tmin * info["sfreq"])) for cond in conds)
else:
tmin_s = dict((cond, int(tmin.get(cond, -.1) * info["sfreq"]))
for cond in conds)
if isinstance(tmax, (float, int)):
tmax_s = dict(
(cond, int((tmax * info["sfreq"]) + 1.)) for cond in conds)
else:
tmax_s = dict((cond, int((tmax.get(cond, 1.) * info["sfreq"]) + 1))
for cond in conds)
# Construct predictor matrix
# We do this by creating one array per event type, shape (lags, samples)
# (where lags depends on tmin/tmax and can be different for different
# event types). Columns correspond to predictors, predictors correspond to
# time lags. Thus, each array is mostly sparse, with one diagonal of 1s
# per event (for binary predictors).
# This should probably be improved (including making it more robust to
# high frequency data) by operating on sparse matrices. As-is, high
# sampling rates plus long windows blow up the system due to the inversion
# of massive matrices.
# Furthermore, assigning to a preallocated array would be faster.
n_samples = len(times)
samples = np.zeros(n_samples, dtype=float)
cond_length = dict()
all_n_lags = [int(tmax_s[cond] - tmin_s[cond]) for cond in conds]
X = np.empty((len(samples), sum(all_n_lags)), dtype=float)
with profile.timestamp('assemble-design-matrix'):
for i_cond, (cond, n_lags) in enumerate(zip(conds, all_n_lags)):
# create the first row and column to be later used by toeplitz to
# build the full predictor matrix
lags = np.zeros(n_lags, dtype=float)
tmin_ = tmin_s[cond]
if cond in event_id: # for binary predictors
ids = ([event_id[cond]]
if isinstance(event_id[cond], int)
else event_id[cond])
samples[events[np.in1d(events[:, 2], ids), 0] + int(tmin_)] = 1
cond_length[cond] = np.sum(np.in1d(events[:, 2], ids))
else: # for predictors from covariates, e.g. continuous ones
if len(covariates[cond]) != len(events):
error = """Condition {0} from ```covariates``` is
not the same length as ```events```""".format(cond)
raise ValueError(error)
for time, value in zip(events[:, 0], covariates[cond]):
samples[time + int(tmin_)] = float(value)
cond_length[cond] = len(np.nonzero(covariates[cond])[0])
# this is the magical part (thanks to Marijn van Vliet):
# use toeplitz to construct series of diagonals
X[:, i_cond * n_lags:(1 + i_cond) * n_lags] = linalg.toeplitz(
samples, lags)
has_val = np.where(np.any(X, axis=1))[0]
# additionally, reject positions based on extreme steps in the data
with profile.timestamp('reject'):
if reject is not None:
data, inds = _reject_data_segments(data, reject, flat, None,
info, tstep)
for t0, t1 in inds:
has_val[t0:t1] = False
# additionally, reject positions based on extreme steps in the data
X = X[has_val]
else:
X = X[has_val]
Y = data[:, has_val]
# additionally, reject positions based on extreme steps in the data
# solve linear system
with profile.timestamp('solver'):
coefs = solver(X, Y)
# construct Evoked objects to be returned from output
evokeds = dict()
cum = 0
# additionally, reject positions based on extreme steps in the data
with profile.timestamp('finalize-return'):
for cond in conds:
tmin_, tmax_ = tmin_s[cond], tmax_s[cond]
evokeds[cond] = EvokedArray(coefs[:, cum:cum + tmax_ - tmin_],
info=info, tmin=tmin_ / info["sfreq"],
comment=cond, nave=cond_length[cond],
kind='mean') # note that nave and kind are
cum += tmax_ - tmin_ # technically not correct
return evokeds
# Preprocess data
data_path = spm_face.data_path()
# Load and filter data, set up epochs
raw_fname = data_path + '/MEG/spm/SPM_CTF_MEG_example_faces1_3D_raw.fif'
raw = mne.io.Raw(raw_fname, preload=True) # Take first run
picks = mne.pick_types(raw.info, meg=True, exclude='bads')
raw.filter(1, 45, method='iir')
events = mne.find_events(raw, stim_channel='UPPT001')
event_id = dict(faces=1, scrambled=2)
tmin, tmax = -.1, .5
raw.pick_types(meg=True)
#
# # regular epoching
# epochs = mne.Epochs(raw, events, event_id, tmin, tmax, reject=None,
# baseline=None, preload=True, verbose=False, decim=1)
# rERF
evokeds = linear_regression_raw(raw, events=events, event_id=event_id,
reject=None, tmin=tmin, tmax=tmax,
decim=1)
# linear_regression_raw returns a dict of evokeds
# select conditions similarly to mne.Epochs objects
#
# # plot both results
# cond = "faces"
# fig, (ax1, ax2) = plt.subplots(1, 2)
# epochs[cond].average().plot(axes=ax1, show=False)
# evokeds[cond].plot(axes=ax2, show=False)
# ax1.set_title("Traditional averaging")
# ax2.set_title("rERF")
# plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment