Skip to content

Instantly share code, notes, and snippets.

@amirkdv
Last active June 8, 2018 05:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amirkdv/26c4ed8bf89bb26e1530467187a46f9a to your computer and use it in GitHub Desktop.
Save amirkdv/26c4ed8bf89bb26e1530467187a46f9a to your computer and use it in GitHub Desktop.
Statistical Analysis of Coherence in LFP recordings
#!/usr/bin/env python
import sys
import numpy as np
from numpy.fft import rfft, rfftfreq, irfft
from matplotlib import pyplot as plt
from scipy.signal import csd, welch
# install via `pip install git+https://github.com/aaren/wavelets`
from wavelets import WaveletAnalysis
# install via `pip install statsmodels`
from statsmodels.tsa.vector_ar.var_model import VAR
from statsmodels.tsa.ar_model import AR
plt.rcParams.update({'text.latex.preamble' : [r'\usepackage{amsmath}']})
plt.rc('text', usetex=True)
#==============================================================================
# LFP Simulation
#==============================================================================
# returns a guassian noise signal of given length and standard deviation.
def noise(length, A):
return A * np.random.randn(length)
# reconstructs the time domain representation of a singal based on its power
# spectrum. The signal amplitude is rescaled such that its range (i.e max -
# min) is 2A. If phases are not given, they are chosen randomly from a Gaussian
# distribution centered at 0 with standard deviation 2pi.
def signal_from_spectrum(t, powers, A, phases=None):
if phases is not None:
assert len(phases) == len(powers)
else:
phases = 2 * np.pi * np.random.randn(len(powers))
fourier = np.multiply(
np.sqrt(2 * powers), # amplitudes at each frequency
np.exp(1j * phases) # random phase
)
np.insert(fourier, 0, 0)
x = irfft(fourier, len(t))
x *= 2 * A / (max(x) - min(x))
return x
# returns a numpy array representing a function f(x) defined to be 1 (or
# amplitude) over the support (inclusive tuple of x range) and 0 elsewhere.
def bump_function(x, support, amplitude=1):
f = np.zeros(len(x))
in_support = lambda _x: _x >= support[0] and _x <= support[1]
return np.array([amplitude if in_support(_x) else 0 for _x in x])
# returns the power spectrum of a 1/f^alpha signal (optionally bandpassed).
def lfp_power(t, dt, band=None, alpha=1):
f_range = rfftfreq(t.size, d=dt)
if band:
in_band = lambda f: f >= band[0] and f <= band[1]
power = np.array([1. / f ** alpha if f > 0 and in_band(f) else 0 for f in f_range])
else:
power = np.array([1. / f ** alpha if f > 0 else 0 for f in f_range])
return power
# returns a pair of simulated LFP signals with 1/f^alpha power spectrum
# (optionally bandpassed; cf. lfp_power()). Additionally power bumps can be
# added to the default power spectrum leading to oscillationas with specified
# time and frequency range. A gaussian noise will be superimposed with standard
# deviation dictated by SNR (use inf to make noise vanish).
def two_channel_lfp(t, dt, A, snr=20, power_bumps=None, band=None):
powers = lfp_power(t, dt, band=band)
X = signal_from_spectrum(t, powers, A)
Y = signal_from_spectrum(t, powers, A)
f_range = rfftfreq(t.size, d=dt)
noise_amplitude = A / np.sqrt(snr)
if noise_amplitude:
X += noise(len(t), noise_amplitude)
Y += noise(len(t), noise_amplitude)
if not power_bumps:
return X, Y
for power_bump in power_bumps:
b_powers = bump_function(f_range, power_bump['freq'])
if 'phase_shift' in power_bump:
# phase lock the two signals with given shift
phases_x = 2 * np.pi * np.random.randn(len(f_range))
phases_y = phases_x + power_bump['phase_shift']
if 'phase_shift_sd' in power_bump:
# add noise to phases
phases_y += power_bump['phase_shift_sd'] * np.random.randn(len(f_range))
bump_x = signal_from_spectrum(t, b_powers, power_bump['amplitude'], phases=phases_x)
bump_y = signal_from_spectrum(t, b_powers, power_bump['amplitude'], phases=phases_y)
else:
bump_x = signal_from_spectrum(t, b_powers, power_bump['amplitude'])
bump_y = signal_from_spectrum(t, b_powers, power_bump['amplitude'])
start, end = power_bump['time']
if 'start_sd' in power_bump:
start += power_bump['start_sd'] * np.random.randn()
start_idx, end_idx = int(start / dt), int(end / dt)
in_t_range = lambda idx: idx >= start_idx and idx <= end_idx
X += np.array([b if in_t_range(idx) else 0 for idx, b in enumerate(bump_x)])
Y += np.array([b if in_t_range(idx) else 0 for idx, b in enumerate(bump_y)])
return X, Y
# returns multiple trials of single channel LFP recordings; all arguments are
# passed as is to two_channel_lfp (second channel always ignored)
def one_channel_lfp_n(n_trials, *args, **kwargs):
return np.array([two_channel_lfp(*args, **kwargs)[0] for _ in range(n_trials)])
# returns multiple trials of two channel LFP recordings; all arguments are
# passed as is to two_channel_lfp.
def two_channel_lfp_n(n_trials, *args, **kwargs):
nX = [[]] * n_trials
nY = [[]] * n_trials
for i in range(n_trials):
nX[i], nY[i] = two_channel_lfp(*args, **kwargs)
return np.array(nX), np.array(nY)
#==============================================================================
# Time Domain Analyses
#==============================================================================
# Calculates the distance between C(ref, ref+s) and C(t, t+s) for all t. This
# serves as a measure of variation in autocovariance function over time.
def autocov_distance(ref, t, T, dt, nX, scatter=None, ctr_step=5e-3):
mX = np.mean(nX, axis=0)
ref_idx = int(ref / dt)
cov_ref = np.array([np.mean(nX[:,ref_idx] * nX[:,_t]) for _t in range(len(t))])
ctrs = np.arange(ref + 20e-3, T - 20e-3, ctr_step)
dists = np.zeros(len(ctrs))
for idx, ctr in enumerate(ctrs):
print ref, ctr
ctr_idx = int(ctr / dt)
cov_ctr = [np.mean(nX[:,ctr_idx] * nX[:,_t]) - mX[ctr_idx] * mX[_t]
for _t in range(len(t))]
_cov_ctr = cov_ctr[ctr_idx - ref_idx:]
_cov_ref = cov_ref[:len(t) - ctr_idx + ref_idx]
support = T - (ctr - ref)
dists[idx] = np.sum(np.abs(_cov_ref - _cov_ctr)) / support
if scatter:
c = np.random.rand(3, 1)
scatter.scatter(1000 * (t - ctr), cov_ctr, alpha=.2, facecolor=c, edgecolor=c, s=.2)
dists = (dists - min(dists)) / (max(dists) - min(dists))
return ctrs, dists
# Fits a multivariate autoregressive model (where the multiple dimensions
# represent different trials) to the given single channel data over a rolling
# window. At each window center the log(min(root)) of the characteristic
# polynomial of the AR model is reported as a stability (stationarity) index.
def AR_root_test(nX, dt, T, ax, c, win_len=20e-3, ctr_step=5e-3):
win_ctr_ts = np.arange(win_len, T - 50e-3, ctr_step)
#win_ctr_ts = np.arange(70e-3, 210e-3, ctr_step)
sys.stderr.write('single trial AR #/%d: ' % len(nX))
# single trial AR model
for idx, X in enumerate(nX):
stability = []
sys.stderr.write('%d ' % (idx + 1))
for ctr_t in win_ctr_ts:
ctr_t_idx = int((ctr_t - win_len / 2) / dt)
max_t_idx = int((ctr_t + win_len / 2) / dt)
ar_res = AR(X[ctr_t_idx:max_t_idx]).fit(5)
stability_idx = np.log(min(np.abs(ar_res.roots)))
stability.append(stability_idx)
ax.plot(1000 * win_ctr_ts, stability, alpha=.2, c=c)
sys.stderr.write('\n')
# multiple trial VAR model
sys.stderr.write('multiple trial VAR:\n')
stability = []
for ctr_t in win_ctr_ts:
ctr_t_idx = int((ctr_t - win_len / 2) / dt)
max_t_idx = int((ctr_t + win_len / 2) / dt)
ar_res = VAR(nX[:,ctr_t_idx:max_t_idx]).fit(5)
stability_idx = np.log(min(np.abs(ar_res.roots)))
sys.stderr.write(' (%.0f, %.0f): %.2f \n' % (1000 * (ctr_t - win_len / 2.), 1000 * (ctr_t + win_len / 2.), stability_idx))
stability.append(stability_idx)
sys.stderr.write('\n')
ax.plot(1000 * win_ctr_ts, stability, c=c, lw=4, alpha=.7)
#==============================================================================
# Frequency Domain Analyses
#==============================================================================
# Estimates the coherence by estimating spectral properties (cross-spectral
# density and individual spectral densities) by averaging over trials.
def coherence_with_spectral_avg(nX, nY, dt, ax, c):
assert len(nX) == len(nY)
n_trials = len(nX)
sxy = np.array([csd(nX[i], nY[i], fs=1./dt)[1] for i in range(n_trials)])
sxx = np.array([welch(X, fs=1./dt)[1] for X in nX])
syy = np.array([welch(Y, fs=1./dt)[1] for Y in nY])
f, _ = csd(nX[0], nX[0], fs=1./dt)
for i in range(len(sxy)):
ax.plot(f, (np.abs(sxy[i]) ** 2) / (sxx[i] * syy[i]), c=c, alpha=.1, lw=1)
# |<S_xy>|^2
coherence = (np.abs(np.mean(sxy, axis=0)) ** 2) / (np.mean(sxx, axis=0) * np.mean(syy, axis=0))
#coherence = np.abs(np.mean(sxy / (np.sqrt(sxx) * np.sqrt(syy)), axis=0)) ** 2
return f, coherence
# <|S_xy|^2>
# NOTE trial averaged coherence too depends on phase locking because the
# cross-spectrals are added (the correct way above) and if the phase
# difference is not constant; they cancel each other out. However, in a
# non-phase locked coherence across trials:
coherence = np.mean(np.abs(sxy) ** 2, axis=0) / (np.mean(sxx, axis=0) * np.mean(syy, axis=0))
return f, coherence
def wavelet_plv(nX, nY, dt):
assert len(nX) == len(nY)
n_trials = len(nX)
phase_diffs = None
sys.stderr.write(' # ')
for i in range(n_trials):
sys.stderr.write('%d ' % (i + 1))
wa_x = WaveletAnalysis(nX[i], dt=dt)
wa_y = WaveletAnalysis(nY[i], dt=dt)
diff = np.angle(wa_x.wavelet_transform) - np.angle(wa_y.wavelet_transform)
if phase_diffs is None:
phase_diffs = np.zeros((n_trials,) + diff.shape)
phase_diffs[i] = diff
sys.stderr.write('\n')
return phase_diffs, wa_x.time, 1. / wa_x.fourier_periods
def wavelet_coherence(nX, nY, dt):
assert len(nX) == len(nY)
n_trials = len(nX)
s_xx, s_yy, s_xy = None, None, None
sys.stderr.write(' # ')
for i in range(n_trials):
sys.stderr.write('%d ' % (i + 1))
wa_x = WaveletAnalysis(nX[i], dt=dt)
wa_y = WaveletAnalysis(nY[i], dt=dt)
X_w, Y_w = wa_x.wavelet_transform, wa_y.wavelet_transform
if s_xx is None:
shape = (n_trials,) + X_w.shape
s_xx, s_yy = np.zeros(shape), np.zeros(shape)
# add 0j to make the array a complex object; cf. http://stackoverflow.com/a/11965466
s_xy = np.zeros(shape) + 0j
s_xx[i] = np.abs(X_w) ** 2
s_yy[i] = np.abs(Y_w) ** 2
s_xy[i] = np.multiply(np.conj(X_w), Y_w)
sys.stderr.write('\n')
return s_xx, s_yy, s_xy, wa_x.time, 1. / wa_x.fourier_periods
#==============================================================================
# Plotting helpers
#==============================================================================
def setup_axis(ax, xlabel, ylabel):
ax.grid(True)
ax.xaxis.set_label_coords(.9, -.07)
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel, rotation='vertical')
return ax
def highlight_power_bumps(ax, power_bumps, c='b'):
for bump in power_bumps:
start, end = 1000 * bump['time'][0], 1000 * bump['time'][1]
ax.axvspan(start, end, alpha=0.1, color=c)
#==============================================================================
# Experiments
#==============================================================================
def plot_sample_lfp(path):
T = 400e-3
dt = 10e-5
A = 30
t = np.arange(0, T, dt)
fig = plt.figure(figsize=(16, 12))
def _plot(ax_lfp, ax_pow, bumps):
X, _ = two_channel_lfp(t, dt, A, snr=20, power_bumps=bumps)
ax_lfp.plot(1000 * t, X, c='k', alpha=.7)
highlight_power_bumps(ax_lfp, bumps)
f, power = rfftfreq(X.size, d=dt), np.abs(rfft(X)) ** 2
ax_pow.plot(f, power, alpha=.7, c='k', lw=2)
ax_pow.set_xscale('log')
ax_pow.set_yscale('log')
if bumps:
ax_pow.axvspan(*bumps[0]['freq'], alpha=0.3, color='b')
f, power = welch(X, fs=1./dt)
ax_pow.plot(f, power, alpha=.7, c='r', lw=2)
bumps = []
ax_lfp = setup_axis(fig.add_subplot(2, 2, 1), 'time (ms)', 'LFP ($\mu V$)')
ax_pow = setup_axis(fig.add_subplot(2, 2, 3), 'frequency (Hz)', 'power density')
_plot(ax_lfp, ax_pow, bumps)
bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 35), 'start_sd': 0, 'amplitude': 10}]
ax_lfp = setup_axis(fig.add_subplot(2, 2, 2), 'time (ms)', 'LFP ($\mu V$)')
ax_pow = setup_axis(fig.add_subplot(2, 2, 4), 'frequency (Hz)', 'power density')
_plot(ax_lfp, ax_pow, bumps)
fig.savefig(path, dpi=500)
sys.stderr.write('saved plot to %s.\n' % path)
def plot_stationarity_measures(path):
T = 400e-3
dt = 10e-5
A = 30
t = np.arange(0, T, dt)
# NOTE discovery: if bump amplitude is ~ 10 => way less reliable
bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 35), 'start_sd': 5e-3, 'amplitude': 10}]
#bumps = [{'time': ( 50e-3, 150e-3), 'freq': (30, 35), 'start_sd': 0, 'amplitude': 30},
#{'time': (200e-3, 300e-3), 'freq': (60, 65), 'start_sd': 10e-3, 'amplitude': 30}]
n_trials = 1000
nX = one_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps)
fig = plt.figure(figsize=(10, 16))
mX = np.mean(nX, axis=0)
ax = setup_axis(fig.add_subplot(4, 1, 1), 'time (ms)', 'LFP ($\mu V$)')
ax.plot(1000 * t, nX[0], alpha=.7, c='k')
ax.plot(1000 * t, mX, alpha=.7, c='g', lw=3)
highlight_power_bumps(ax, bumps)
ax = setup_axis(fig.add_subplot(4, 1, 2), 'time (ms)', 'LFP Variance')
vX = np.var(nX, axis=0)
ax.plot(1000 * t, vX, alpha=.7, c='k')
ax = setup_axis(fig.add_subplot(4, 1, 4),
'lag $\\tau$ (ms)', '$C(t_\circ, t_\circ + \\tau)$')
ctrs, dists = autocov_distance(0, t, T, dt, nX, scatter=ax, ctr_step=5e-3)
ax = setup_axis(fig.add_subplot(4, 1, 3),
'$t_\circ$ (ms)',
'$\\big \\lVert C(0, \cdot) - C(t_\circ, \cdot) \\big \\rVert_1$')
ax.plot(1000 * ctrs, dists, c='k', alpha=.7, lw=2)
ax.set_xlim(0, 1000 * T)
ax.set_ylim(0, 1.1)
fig.savefig(path, dpi=500)
sys.stderr.write('saved plot to %s.\n' % path)
def plot_ar_stability_measure(path):
T = 400e-3
dt = 25e-5
A = 30
t = np.arange(0, T, dt)
n_trials = 15
fig = plt.figure(figsize=(20, 8))
win_len = 30e-3
ctr_step = 5e-3
# NOTE discovery: if bump amplitude is ~ 10 => way less reliable
#bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': 30}]
bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': 10}]
nX = one_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps)
ax = setup_axis(fig.add_subplot(2, 2, 1), 'time (ms)', 'LFP ($\mu V$)')
ax.plot(1000 * t, nX[0], c='k', alpha=.7)
highlight_power_bumps(ax, bumps)
ax = setup_axis(fig.add_subplot(2, 2, 3, sharex=ax),
'center of %.1f ms window' % (1000 * win_len),
'Stability Index ($\log \min |\lambda_i|$)')
AR_root_test(nX, dt, T, ax, 'k', win_len=win_len, ctr_step=ctr_step)
ax.set_ylim(min(0, ax.get_ylim()[0]) - .1, None)
ax.axhline(0, ax.get_xlim()[0], ax.get_xlim()[1], c='r', lw=4, alpha=.4)
# -----------
# without power bumps
# -----------
nX_stationary = one_channel_lfp_n(n_trials, t, dt, A)
ax = setup_axis(fig.add_subplot(2, 2, 2), 'time (ms)', 'LFP ($\mu V$)')
ax.plot(1000 * t, nX_stationary[0], c='k', alpha=.7)
ax = setup_axis(fig.add_subplot(2, 2, 4, sharex=ax),
'center of %.1f ms window' % (1000 * win_len),
'Stability Index ($\log \min |\lambda_i|$)')
AR_root_test(nX_stationary, dt, T, ax, 'k', win_len=win_len, ctr_step=ctr_step)
ax.set_ylim(min(0, ax.get_ylim()[0]) - .1, None)
ax.axhline(0, ax.get_xlim()[0], ax.get_xlim()[1], c='r', lw=4, alpha=.4)
fig.savefig(path, dpi=500)
sys.stderr.write('saved plot to %s.\n' % path)
def plot_coherence_measures(path):
T = 400e-3
dt = 10e-5
A = 30
t = np.arange(0, T, dt)
n_trials = 100
fig = plt.figure(figsize=(16, 12))
# NOTE assumes there is only one bump
def _plot(bumps, ax_traces, ax_coherence):
nX, nY = two_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps)
ax_traces[0].plot(1000 * t, nX[0], c='k', alpha=.5)
ax_traces[0].plot(1000 * t, nY[0], c='m', alpha=.5)
highlight_power_bumps(ax_traces[0], bumps)
ax_traces[1].plot(1000 * t, nX[1], c='k', alpha=.5)
ax_traces[1].plot(1000 * t, nY[1], c='m', alpha=.5)
highlight_power_bumps(ax_traces[1], bumps)
ax_coherence.axvspan(*bumps[0]['freq'], alpha=0.3, color='b')
f, coh = coherence_with_spectral_avg(nX, nY, dt, ax_coherence, 'r')
ax_coherence.plot(f, coh, c='r', lw=5, alpha=.7)
# coherence only over bump
bump_t = bumps[0]['time']
nX = np.array([X[int(bump_t[0] / dt):int(bump_t[1] / dt)] for X in nX])
nY = np.array([Y[int(bump_t[0] / dt):int(bump_t[1] / dt)] for Y in nY])
f, coh = coherence_with_spectral_avg(nX, nY, dt, ax_coherence, 'g')
# add some noise to coherence plots so both red and green coherence in
# non-phase locked example are visible
coh += np.max(np.array([np.zeros(len(coh)), .01 * np.random.randn(len(coh))]))
ax_coherence.plot(f, coh, c='g', lw=6, alpha=.7)
ax_coherence.set_xlim(0, 250)
ax_coherence.set_ylim(-.1, 1)
bump_A = 30
#bump_A = 5
# -----------------
# with phase locking
# -----------------
bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': bump_A, 'phase_shift': np.pi / 2.}]
ax_traces = [None, None]
ax_traces[0] = setup_axis(fig.add_subplot(3, 2, 1), 'time (ms)', 'LFP ($\mu V$)')
ax_traces[1] = setup_axis(fig.add_subplot(3, 2, 3), 'time (ms)', 'LFP ($\mu V$)')
ax_coherence = setup_axis(fig.add_subplot(3, 2, 5), 'frequency (Hz)', '$|\widehat{\gamma}_{xy}|^2$')
_plot(bumps, ax_traces, ax_coherence)
# -----------------
# without phase locking
# -----------------
bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': bump_A}]
ax_traces = [None, None]
ax_traces[0] = setup_axis(fig.add_subplot(3, 2, 2), 'time (ms)', 'LFP ($\mu V$)')
ax_traces[1] = setup_axis(fig.add_subplot(3, 2, 4), 'time (ms)', 'LFP ($\mu V$)')
ax_coherence = setup_axis(fig.add_subplot(3, 2, 6), 'frequency (Hz)', '$|\widehat{\gamma}_{xy}|^2$')
_plot(bumps, ax_traces, ax_coherence)
fig.savefig(path, dpi=500)
sys.stderr.write('saved plot to %s.\n' % path)
def plot_wavelet_plv(path):
T = 400e-3
dt = 10e-5
A = 30
t = np.arange(0, T, dt)
fig = plt.figure(figsize=(12, 12))
n_trials = 100
# -----------------
# with phase locking
# -----------------
bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': 5, 'phase_shift': 1}]
nX, nY = two_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps)
sys.stderr.write('%d phase-locked trials' % n_trials)
phase_diffs, times, freqs = wavelet_plv(nX, nY, dt)
times, freqs = np.meshgrid(1000 * times, freqs)
# first 5 trials only
ax = setup_axis(fig.add_subplot(2, 2, 1), 'time (ms)', 'frequency (Hz)')
plv = np.abs(np.mean(np.exp(1j * phase_diffs[:5]), axis=0))
ax.contourf(times, freqs, plv, 100)
ax.set_ylim(0, 250)
# all trials
ax = setup_axis(fig.add_subplot(2, 2, 3), 'time (ms)', 'frequency (Hz)')
plv = np.abs(np.mean(np.exp(1j * phase_diffs), axis=0))
ax.contourf(times, freqs, plv, 100)
ax.set_ylim(0, 250)
# -----------------
# without phase locking
# -----------------
bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': 5}]
nX, nY = two_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps)
sys.stderr.write('%d non-phase-locked trials' % n_trials)
phase_diffs, times, freqs = wavelet_plv(nX, nY, dt)
times, freqs = np.meshgrid(1000 * times, freqs)
# first 5 trials only
ax = setup_axis(fig.add_subplot(2, 2, 2), 'time (ms)', 'frequency (Hz)')
plv = np.abs(np.mean(np.exp(1j * phase_diffs[:5]), axis=0))
ax.contourf(times, freqs, plv, 100)
ax.set_ylim(0, 250)
# all trials
ax = setup_axis(fig.add_subplot(2, 2, 4), 'time (ms)', 'frequency (Hz)')
plv = np.abs(np.mean(np.exp(1j * phase_diffs), axis=0))
contours = ax.contourf(times, freqs, plv, 100)
ax.set_ylim(0, 250)
fig.subplots_adjust(right=0.8)
fig.colorbar(contours, cax=fig.add_axes([0.85, 0.35, 0.01, 0.3]))
fig.savefig(path, dpi=500)
sys.stderr.write('saved plot to %s.\n' % path)
def plot_wavelet_coherence(path):
T = 400e-3
dt = 10e-5
A = 30
t = np.arange(0, T, dt)
fig = plt.figure(figsize=(12, 12))
n_trials = 100
# -----------------
# with phase locking
# -----------------
bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': 5, 'phase_shift': 1}]
nX, nY = two_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps)
sys.stderr.write('%d phase-locked trials' % n_trials)
s_xx, s_yy, s_xy, times, freqs = wavelet_coherence(nX, nY, dt)
times, freqs = np.meshgrid(1000 * times, freqs)
# first 5 trials only
ax = setup_axis(fig.add_subplot(2, 2, 1), 'time (ms)', 'frequency (Hz)')
coh = (np.abs(np.mean(s_xy[:5], axis=0)) ** 2) / (np.mean(s_xx[:5], axis=0) * np.mean(s_yy[:5], axis=0))
ax.contourf(times, freqs, coh, 100)
ax.set_ylim(0, 250)
# all trials
ax = setup_axis(fig.add_subplot(2, 2, 3), 'time (ms)', 'frequency (Hz)')
coh = (np.abs(np.mean(s_xy, axis=0)) ** 2) / (np.mean(s_xx, axis=0) * np.mean(s_yy, axis=0))
ax.contourf(times, freqs, coh, 100)
ax.set_ylim(0, 250)
# -----------------
# without phase locking
# -----------------
bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': 5}]
nX, nY = two_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps)
sys.stderr.write('%d non-phase-locked trials' % n_trials)
s_xx, s_yy, s_xy, times, freqs = wavelet_coherence(nX, nY, dt)
times, freqs = np.meshgrid(1000 * times, freqs)
# first 5 trials only
ax = setup_axis(fig.add_subplot(2, 2, 2), 'time (ms)', 'frequency (Hz)')
coh = (np.abs(np.mean(s_xy[:5], axis=0)) ** 2) / (np.mean(s_xx[:5], axis=0) * np.mean(s_yy[:5], axis=0))
ax.contourf(times, freqs, coh, 100)
ax.set_ylim(0, 250)
# all trials
ax = setup_axis(fig.add_subplot(2, 2, 4), 'time (ms)', 'frequency (Hz)')
coh = (np.abs(np.mean(s_xy, axis=0)) ** 2) / (np.mean(s_xx, axis=0) * np.mean(s_yy, axis=0))
contours = ax.contourf(times, freqs, coh, 100)
ax.set_ylim(0, 250)
fig.subplots_adjust(right=0.8)
fig.colorbar(contours, cax=fig.add_axes([0.85, 0.35, 0.01, 0.3]))
fig.savefig(path, dpi=500)
sys.stderr.write('saved plot to %s.\n' % path)
def plot_example_resultant(path):
def _plot_resultant(ax, n, phase_mean, phase_sd):
vectors = np.exp(1j * (phase_mean + phase_sd * np.random.randn(n)))
quiver_kw = {
'angles': 'xy',
'scale_units': 'xy',
'scale': 1
}
ax.quiver(np.zeros(n), np.zeros(n), vectors.real, vectors.imag,
width=5e-3, alpha=.2, color='k', **quiver_kw)
resultant = np.mean(vectors)
ax.quiver(np.zeros(1), np.zeros(1), resultant.real, resultant.imag,
width=10e-3, alpha=.7, color='r', **quiver_kw)
ax.set_aspect('equal')
ax.set_xlim(-1.1, 1.1)
ax.set_ylim(-1.1, 1.1)
# draw the unit circle
t = np.linspace(0, 2 * np.pi, 1000)
ax.plot(np.cos(t), np.sin(t), c='b', lw=2, alpha=.2)
phase_mean = np.pi / 4.
n = 500
fig = plt.figure(figsize=(12, 6))
ax = setup_axis(fig.add_subplot(1, 2, 1), 'Re', 'Im')
_plot_resultant(ax, n, phase_mean, .06 * np.pi)
ax = setup_axis(fig.add_subplot(1, 2, 2), 'Re', 'Im')
_plot_resultant(ax, n, phase_mean, .6 * np.pi)
fig.savefig(path, dpi=300)
sys.stderr.write('saved plot to %s.\n' % path)
def plot_morlet_wavelets(path):
fig = plt.figure(figsize=(12, 8))
ax_re = setup_axis(fig.add_subplot(2, 2, 1), 'time', 'Re\{$\psi(\\frac{t-\\tau}{s})$\}')
ax_im = setup_axis(fig.add_subplot(2, 2, 2), 'time', 'Im\{$\psi(\\frac{t-\\tau}{s})$\}')
t = np.linspace(-5, 5, 10000)
def wavelet(t, s, tau):
_t = (t - tau) / s
return np.pi ** -.25 * np.exp(-_t**2 / 2) * np.exp(-6j * _t)
w = wavelet(t, 1, 0)
ax_re.plot(t, w.real, 'k', lw=1, alpha=.9)
ax_im.plot(t, w.imag, 'k', lw=1, alpha=.9)
w = wavelet(t, 2, 0)
ax_re.plot(t, w.real, 'm', lw=4, alpha=.2)
ax_im.plot(t, w.imag, 'm', lw=4, alpha=.2)
ax_re = setup_axis(fig.add_subplot(2, 2, 3), 'time', 'Re\{$\psi(\\frac{t-\\tau}{s})$\}')
ax_im = setup_axis(fig.add_subplot(2, 2, 4), 'time', 'Im\{$\psi(\\frac{t-\\tau}{s})$\}')
w = wavelet(t, 1, 0)
ax_re.plot(t, w.real, 'k', lw=1, alpha=.9)
ax_im.plot(t, w.imag, 'k', lw=1, alpha=.9)
w = wavelet(t, 1, 2)
ax_re.plot(t, w.real, 'm', lw=4, alpha=.2)
ax_im.plot(t, w.imag, 'm', lw=4, alpha=.2)
fig.savefig(path, dpi=500)
if __name__ == '__main__':
plot_sample_lfp('lfp.png')
plot_stationarity_measures('stationarity.png')
plot_ar_stability_measure('AR-stability-index.png')
plot_coherence_measures('coherence.png')
plot_wavelet_plv('wavelet-plv.png')
plot_wavelet_coherence('wavelet-coherence.png')
plot_example_resultant('resultant.png')
plot_morlet('morlet.png')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment