#!/usr/bin/env python import sys import numpy as np from numpy.fft import rfft, rfftfreq, irfft from matplotlib import pyplot as plt from scipy.signal import csd, welch # install via pip install git+https://github.com/aaren/wavelets from wavelets import WaveletAnalysis # install via pip install statsmodels from statsmodels.tsa.vector_ar.var_model import VAR from statsmodels.tsa.ar_model import AR plt.rcParams.update({'text.latex.preamble' : [r'\usepackage{amsmath}']}) plt.rc('text', usetex=True) #============================================================================== # LFP Simulation #============================================================================== # returns a guassian noise signal of given length and standard deviation. def noise(length, A): return A * np.random.randn(length) # reconstructs the time domain representation of a singal based on its power # spectrum. The signal amplitude is rescaled such that its range (i.e max - # min) is 2A. If phases are not given, they are chosen randomly from a Gaussian # distribution centered at 0 with standard deviation 2pi. def signal_from_spectrum(t, powers, A, phases=None): if phases is not None: assert len(phases) == len(powers) else: phases = 2 * np.pi * np.random.randn(len(powers)) fourier = np.multiply( np.sqrt(2 * powers), # amplitudes at each frequency np.exp(1j * phases) # random phase ) np.insert(fourier, 0, 0) x = irfft(fourier, len(t)) x *= 2 * A / (max(x) - min(x)) return x # returns a numpy array representing a function f(x) defined to be 1 (or # amplitude) over the support (inclusive tuple of x range) and 0 elsewhere. def bump_function(x, support, amplitude=1): f = np.zeros(len(x)) in_support = lambda _x: _x >= support[0] and _x <= support[1] return np.array([amplitude if in_support(_x) else 0 for _x in x]) # returns the power spectrum of a 1/f^alpha signal (optionally bandpassed). def lfp_power(t, dt, band=None, alpha=1): f_range = rfftfreq(t.size, d=dt) if band: in_band = lambda f: f >= band[0] and f <= band[1] power = np.array([1. / f ** alpha if f > 0 and in_band(f) else 0 for f in f_range]) else: power = np.array([1. / f ** alpha if f > 0 else 0 for f in f_range]) return power # returns a pair of simulated LFP signals with 1/f^alpha power spectrum # (optionally bandpassed; cf. lfp_power()). Additionally power bumps can be # added to the default power spectrum leading to oscillationas with specified # time and frequency range. A gaussian noise will be superimposed with standard # deviation dictated by SNR (use inf to make noise vanish). def two_channel_lfp(t, dt, A, snr=20, power_bumps=None, band=None): powers = lfp_power(t, dt, band=band) X = signal_from_spectrum(t, powers, A) Y = signal_from_spectrum(t, powers, A) f_range = rfftfreq(t.size, d=dt) noise_amplitude = A / np.sqrt(snr) if noise_amplitude: X += noise(len(t), noise_amplitude) Y += noise(len(t), noise_amplitude) if not power_bumps: return X, Y for power_bump in power_bumps: b_powers = bump_function(f_range, power_bump['freq']) if 'phase_shift' in power_bump: # phase lock the two signals with given shift phases_x = 2 * np.pi * np.random.randn(len(f_range)) phases_y = phases_x + power_bump['phase_shift'] if 'phase_shift_sd' in power_bump: # add noise to phases phases_y += power_bump['phase_shift_sd'] * np.random.randn(len(f_range)) bump_x = signal_from_spectrum(t, b_powers, power_bump['amplitude'], phases=phases_x) bump_y = signal_from_spectrum(t, b_powers, power_bump['amplitude'], phases=phases_y) else: bump_x = signal_from_spectrum(t, b_powers, power_bump['amplitude']) bump_y = signal_from_spectrum(t, b_powers, power_bump['amplitude']) start, end = power_bump['time'] if 'start_sd' in power_bump: start += power_bump['start_sd'] * np.random.randn() start_idx, end_idx = int(start / dt), int(end / dt) in_t_range = lambda idx: idx >= start_idx and idx <= end_idx X += np.array([b if in_t_range(idx) else 0 for idx, b in enumerate(bump_x)]) Y += np.array([b if in_t_range(idx) else 0 for idx, b in enumerate(bump_y)]) return X, Y # returns multiple trials of single channel LFP recordings; all arguments are # passed as is to two_channel_lfp (second channel always ignored) def one_channel_lfp_n(n_trials, *args, **kwargs): return np.array([two_channel_lfp(*args, **kwargs)[0] for _ in range(n_trials)]) # returns multiple trials of two channel LFP recordings; all arguments are # passed as is to two_channel_lfp. def two_channel_lfp_n(n_trials, *args, **kwargs): nX = [[]] * n_trials nY = [[]] * n_trials for i in range(n_trials): nX[i], nY[i] = two_channel_lfp(*args, **kwargs) return np.array(nX), np.array(nY) #============================================================================== # Time Domain Analyses #============================================================================== # Calculates the distance between C(ref, ref+s) and C(t, t+s) for all t. This # serves as a measure of variation in autocovariance function over time. def autocov_distance(ref, t, T, dt, nX, scatter=None, ctr_step=5e-3): mX = np.mean(nX, axis=0) ref_idx = int(ref / dt) cov_ref = np.array([np.mean(nX[:,ref_idx] * nX[:,_t]) for _t in range(len(t))]) ctrs = np.arange(ref + 20e-3, T - 20e-3, ctr_step) dists = np.zeros(len(ctrs)) for idx, ctr in enumerate(ctrs): print ref, ctr ctr_idx = int(ctr / dt) cov_ctr = [np.mean(nX[:,ctr_idx] * nX[:,_t]) - mX[ctr_idx] * mX[_t] for _t in range(len(t))] _cov_ctr = cov_ctr[ctr_idx - ref_idx:] _cov_ref = cov_ref[:len(t) - ctr_idx + ref_idx] support = T - (ctr - ref) dists[idx] = np.sum(np.abs(_cov_ref - _cov_ctr)) / support if scatter: c = np.random.rand(3, 1) scatter.scatter(1000 * (t - ctr), cov_ctr, alpha=.2, facecolor=c, edgecolor=c, s=.2) dists = (dists - min(dists)) / (max(dists) - min(dists)) return ctrs, dists # Fits a multivariate autoregressive model (where the multiple dimensions # represent different trials) to the given single channel data over a rolling # window. At each window center the log(min(root)) of the characteristic # polynomial of the AR model is reported as a stability (stationarity) index. def AR_root_test(nX, dt, T, ax, c, win_len=20e-3, ctr_step=5e-3): win_ctr_ts = np.arange(win_len, T - 50e-3, ctr_step) #win_ctr_ts = np.arange(70e-3, 210e-3, ctr_step) sys.stderr.write('single trial AR #/%d: ' % len(nX)) # single trial AR model for idx, X in enumerate(nX): stability = [] sys.stderr.write('%d ' % (idx + 1)) for ctr_t in win_ctr_ts: ctr_t_idx = int((ctr_t - win_len / 2) / dt) max_t_idx = int((ctr_t + win_len / 2) / dt) ar_res = AR(X[ctr_t_idx:max_t_idx]).fit(5) stability_idx = np.log(min(np.abs(ar_res.roots))) stability.append(stability_idx) ax.plot(1000 * win_ctr_ts, stability, alpha=.2, c=c) sys.stderr.write('\n') # multiple trial VAR model sys.stderr.write('multiple trial VAR:\n') stability = [] for ctr_t in win_ctr_ts: ctr_t_idx = int((ctr_t - win_len / 2) / dt) max_t_idx = int((ctr_t + win_len / 2) / dt) ar_res = VAR(nX[:,ctr_t_idx:max_t_idx]).fit(5) stability_idx = np.log(min(np.abs(ar_res.roots))) sys.stderr.write(' (%.0f, %.0f): %.2f \n' % (1000 * (ctr_t - win_len / 2.), 1000 * (ctr_t + win_len / 2.), stability_idx)) stability.append(stability_idx) sys.stderr.write('\n') ax.plot(1000 * win_ctr_ts, stability, c=c, lw=4, alpha=.7) #============================================================================== # Frequency Domain Analyses #============================================================================== # Estimates the coherence by estimating spectral properties (cross-spectral # density and individual spectral densities) by averaging over trials. def coherence_with_spectral_avg(nX, nY, dt, ax, c): assert len(nX) == len(nY) n_trials = len(nX) sxy = np.array([csd(nX[i], nY[i], fs=1./dt)[1] for i in range(n_trials)]) sxx = np.array([welch(X, fs=1./dt)[1] for X in nX]) syy = np.array([welch(Y, fs=1./dt)[1] for Y in nY]) f, _ = csd(nX[0], nX[0], fs=1./dt) for i in range(len(sxy)): ax.plot(f, (np.abs(sxy[i]) ** 2) / (sxx[i] * syy[i]), c=c, alpha=.1, lw=1) # ||^2 coherence = (np.abs(np.mean(sxy, axis=0)) ** 2) / (np.mean(sxx, axis=0) * np.mean(syy, axis=0)) #coherence = np.abs(np.mean(sxy / (np.sqrt(sxx) * np.sqrt(syy)), axis=0)) ** 2 return f, coherence # <|S_xy|^2> # NOTE trial averaged coherence too depends on phase locking because the # cross-spectrals are added (the correct way above) and if the phase # difference is not constant; they cancel each other out. However, in a # non-phase locked coherence across trials: coherence = np.mean(np.abs(sxy) ** 2, axis=0) / (np.mean(sxx, axis=0) * np.mean(syy, axis=0)) return f, coherence def wavelet_plv(nX, nY, dt): assert len(nX) == len(nY) n_trials = len(nX) phase_diffs = None sys.stderr.write(' # ') for i in range(n_trials): sys.stderr.write('%d ' % (i + 1)) wa_x = WaveletAnalysis(nX[i], dt=dt) wa_y = WaveletAnalysis(nY[i], dt=dt) diff = np.angle(wa_x.wavelet_transform) - np.angle(wa_y.wavelet_transform) if phase_diffs is None: phase_diffs = np.zeros((n_trials,) + diff.shape) phase_diffs[i] = diff sys.stderr.write('\n') return phase_diffs, wa_x.time, 1. / wa_x.fourier_periods def wavelet_coherence(nX, nY, dt): assert len(nX) == len(nY) n_trials = len(nX) s_xx, s_yy, s_xy = None, None, None sys.stderr.write(' # ') for i in range(n_trials): sys.stderr.write('%d ' % (i + 1)) wa_x = WaveletAnalysis(nX[i], dt=dt) wa_y = WaveletAnalysis(nY[i], dt=dt) X_w, Y_w = wa_x.wavelet_transform, wa_y.wavelet_transform if s_xx is None: shape = (n_trials,) + X_w.shape s_xx, s_yy = np.zeros(shape), np.zeros(shape) # add 0j to make the array a complex object; cf. http://stackoverflow.com/a/11965466 s_xy = np.zeros(shape) + 0j s_xx[i] = np.abs(X_w) ** 2 s_yy[i] = np.abs(Y_w) ** 2 s_xy[i] = np.multiply(np.conj(X_w), Y_w) sys.stderr.write('\n') return s_xx, s_yy, s_xy, wa_x.time, 1. / wa_x.fourier_periods #============================================================================== # Plotting helpers #============================================================================== def setup_axis(ax, xlabel, ylabel): ax.grid(True) ax.xaxis.set_label_coords(.9, -.07) ax.set_xlabel(xlabel) ax.set_ylabel(ylabel, rotation='vertical') return ax def highlight_power_bumps(ax, power_bumps, c='b'): for bump in power_bumps: start, end = 1000 * bump['time'][0], 1000 * bump['time'][1] ax.axvspan(start, end, alpha=0.1, color=c) #============================================================================== # Experiments #============================================================================== def plot_sample_lfp(path): T = 400e-3 dt = 10e-5 A = 30 t = np.arange(0, T, dt) fig = plt.figure(figsize=(16, 12)) def _plot(ax_lfp, ax_pow, bumps): X, _ = two_channel_lfp(t, dt, A, snr=20, power_bumps=bumps) ax_lfp.plot(1000 * t, X, c='k', alpha=.7) highlight_power_bumps(ax_lfp, bumps) f, power = rfftfreq(X.size, d=dt), np.abs(rfft(X)) ** 2 ax_pow.plot(f, power, alpha=.7, c='k', lw=2) ax_pow.set_xscale('log') ax_pow.set_yscale('log') if bumps: ax_pow.axvspan(*bumps[0]['freq'], alpha=0.3, color='b') f, power = welch(X, fs=1./dt) ax_pow.plot(f, power, alpha=.7, c='r', lw=2) bumps = [] ax_lfp = setup_axis(fig.add_subplot(2, 2, 1), 'time (ms)', 'LFP ($\mu V$)') ax_pow = setup_axis(fig.add_subplot(2, 2, 3), 'frequency (Hz)', 'power density') _plot(ax_lfp, ax_pow, bumps) bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 35), 'start_sd': 0, 'amplitude': 10}] ax_lfp = setup_axis(fig.add_subplot(2, 2, 2), 'time (ms)', 'LFP ($\mu V$)') ax_pow = setup_axis(fig.add_subplot(2, 2, 4), 'frequency (Hz)', 'power density') _plot(ax_lfp, ax_pow, bumps) fig.savefig(path, dpi=500) sys.stderr.write('saved plot to %s.\n' % path) def plot_stationarity_measures(path): T = 400e-3 dt = 10e-5 A = 30 t = np.arange(0, T, dt) # NOTE discovery: if bump amplitude is ~ 10 => way less reliable bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 35), 'start_sd': 5e-3, 'amplitude': 10}] #bumps = [{'time': ( 50e-3, 150e-3), 'freq': (30, 35), 'start_sd': 0, 'amplitude': 30}, #{'time': (200e-3, 300e-3), 'freq': (60, 65), 'start_sd': 10e-3, 'amplitude': 30}] n_trials = 1000 nX = one_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps) fig = plt.figure(figsize=(10, 16)) mX = np.mean(nX, axis=0) ax = setup_axis(fig.add_subplot(4, 1, 1), 'time (ms)', 'LFP ($\mu V$)') ax.plot(1000 * t, nX[0], alpha=.7, c='k') ax.plot(1000 * t, mX, alpha=.7, c='g', lw=3) highlight_power_bumps(ax, bumps) ax = setup_axis(fig.add_subplot(4, 1, 2), 'time (ms)', 'LFP Variance') vX = np.var(nX, axis=0) ax.plot(1000 * t, vX, alpha=.7, c='k') ax = setup_axis(fig.add_subplot(4, 1, 4), 'lag $\\tau$ (ms)', '$C(t_\circ, t_\circ + \\tau)$') ctrs, dists = autocov_distance(0, t, T, dt, nX, scatter=ax, ctr_step=5e-3) ax = setup_axis(fig.add_subplot(4, 1, 3), '$t_\circ$ (ms)', '$\\big \\lVert C(0, \cdot) - C(t_\circ, \cdot) \\big \\rVert_1$') ax.plot(1000 * ctrs, dists, c='k', alpha=.7, lw=2) ax.set_xlim(0, 1000 * T) ax.set_ylim(0, 1.1) fig.savefig(path, dpi=500) sys.stderr.write('saved plot to %s.\n' % path) def plot_ar_stability_measure(path): T = 400e-3 dt = 25e-5 A = 30 t = np.arange(0, T, dt) n_trials = 15 fig = plt.figure(figsize=(20, 8)) win_len = 30e-3 ctr_step = 5e-3 # NOTE discovery: if bump amplitude is ~ 10 => way less reliable #bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': 30}] bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': 10}] nX = one_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps) ax = setup_axis(fig.add_subplot(2, 2, 1), 'time (ms)', 'LFP ($\mu V$)') ax.plot(1000 * t, nX[0], c='k', alpha=.7) highlight_power_bumps(ax, bumps) ax = setup_axis(fig.add_subplot(2, 2, 3, sharex=ax), 'center of %.1f ms window' % (1000 * win_len), 'Stability Index ($\log \min |\lambda_i|$)') AR_root_test(nX, dt, T, ax, 'k', win_len=win_len, ctr_step=ctr_step) ax.set_ylim(min(0, ax.get_ylim()[0]) - .1, None) ax.axhline(0, ax.get_xlim()[0], ax.get_xlim()[1], c='r', lw=4, alpha=.4) # ----------- # without power bumps # ----------- nX_stationary = one_channel_lfp_n(n_trials, t, dt, A) ax = setup_axis(fig.add_subplot(2, 2, 2), 'time (ms)', 'LFP ($\mu V$)') ax.plot(1000 * t, nX_stationary[0], c='k', alpha=.7) ax = setup_axis(fig.add_subplot(2, 2, 4, sharex=ax), 'center of %.1f ms window' % (1000 * win_len), 'Stability Index ($\log \min |\lambda_i|$)') AR_root_test(nX_stationary, dt, T, ax, 'k', win_len=win_len, ctr_step=ctr_step) ax.set_ylim(min(0, ax.get_ylim()[0]) - .1, None) ax.axhline(0, ax.get_xlim()[0], ax.get_xlim()[1], c='r', lw=4, alpha=.4) fig.savefig(path, dpi=500) sys.stderr.write('saved plot to %s.\n' % path) def plot_coherence_measures(path): T = 400e-3 dt = 10e-5 A = 30 t = np.arange(0, T, dt) n_trials = 100 fig = plt.figure(figsize=(16, 12)) # NOTE assumes there is only one bump def _plot(bumps, ax_traces, ax_coherence): nX, nY = two_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps) ax_traces[0].plot(1000 * t, nX[0], c='k', alpha=.5) ax_traces[0].plot(1000 * t, nY[0], c='m', alpha=.5) highlight_power_bumps(ax_traces[0], bumps) ax_traces[1].plot(1000 * t, nX[1], c='k', alpha=.5) ax_traces[1].plot(1000 * t, nY[1], c='m', alpha=.5) highlight_power_bumps(ax_traces[1], bumps) ax_coherence.axvspan(*bumps[0]['freq'], alpha=0.3, color='b') f, coh = coherence_with_spectral_avg(nX, nY, dt, ax_coherence, 'r') ax_coherence.plot(f, coh, c='r', lw=5, alpha=.7) # coherence only over bump bump_t = bumps[0]['time'] nX = np.array([X[int(bump_t[0] / dt):int(bump_t[1] / dt)] for X in nX]) nY = np.array([Y[int(bump_t[0] / dt):int(bump_t[1] / dt)] for Y in nY]) f, coh = coherence_with_spectral_avg(nX, nY, dt, ax_coherence, 'g') # add some noise to coherence plots so both red and green coherence in # non-phase locked example are visible coh += np.max(np.array([np.zeros(len(coh)), .01 * np.random.randn(len(coh))])) ax_coherence.plot(f, coh, c='g', lw=6, alpha=.7) ax_coherence.set_xlim(0, 250) ax_coherence.set_ylim(-.1, 1) bump_A = 30 #bump_A = 5 # ----------------- # with phase locking # ----------------- bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': bump_A, 'phase_shift': np.pi / 2.}] ax_traces = [None, None] ax_traces[0] = setup_axis(fig.add_subplot(3, 2, 1), 'time (ms)', 'LFP ($\mu V$)') ax_traces[1] = setup_axis(fig.add_subplot(3, 2, 3), 'time (ms)', 'LFP ($\mu V$)') ax_coherence = setup_axis(fig.add_subplot(3, 2, 5), 'frequency (Hz)', '$|\widehat{\gamma}_{xy}|^2$') _plot(bumps, ax_traces, ax_coherence) # ----------------- # without phase locking # ----------------- bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': bump_A}] ax_traces = [None, None] ax_traces[0] = setup_axis(fig.add_subplot(3, 2, 2), 'time (ms)', 'LFP ($\mu V$)') ax_traces[1] = setup_axis(fig.add_subplot(3, 2, 4), 'time (ms)', 'LFP ($\mu V$)') ax_coherence = setup_axis(fig.add_subplot(3, 2, 6), 'frequency (Hz)', '$|\widehat{\gamma}_{xy}|^2$') _plot(bumps, ax_traces, ax_coherence) fig.savefig(path, dpi=500) sys.stderr.write('saved plot to %s.\n' % path) def plot_wavelet_plv(path): T = 400e-3 dt = 10e-5 A = 30 t = np.arange(0, T, dt) fig = plt.figure(figsize=(12, 12)) n_trials = 100 # ----------------- # with phase locking # ----------------- bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': 5, 'phase_shift': 1}] nX, nY = two_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps) sys.stderr.write('%d phase-locked trials' % n_trials) phase_diffs, times, freqs = wavelet_plv(nX, nY, dt) times, freqs = np.meshgrid(1000 * times, freqs) # first 5 trials only ax = setup_axis(fig.add_subplot(2, 2, 1), 'time (ms)', 'frequency (Hz)') plv = np.abs(np.mean(np.exp(1j * phase_diffs[:5]), axis=0)) ax.contourf(times, freqs, plv, 100) ax.set_ylim(0, 250) # all trials ax = setup_axis(fig.add_subplot(2, 2, 3), 'time (ms)', 'frequency (Hz)') plv = np.abs(np.mean(np.exp(1j * phase_diffs), axis=0)) ax.contourf(times, freqs, plv, 100) ax.set_ylim(0, 250) # ----------------- # without phase locking # ----------------- bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': 5}] nX, nY = two_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps) sys.stderr.write('%d non-phase-locked trials' % n_trials) phase_diffs, times, freqs = wavelet_plv(nX, nY, dt) times, freqs = np.meshgrid(1000 * times, freqs) # first 5 trials only ax = setup_axis(fig.add_subplot(2, 2, 2), 'time (ms)', 'frequency (Hz)') plv = np.abs(np.mean(np.exp(1j * phase_diffs[:5]), axis=0)) ax.contourf(times, freqs, plv, 100) ax.set_ylim(0, 250) # all trials ax = setup_axis(fig.add_subplot(2, 2, 4), 'time (ms)', 'frequency (Hz)') plv = np.abs(np.mean(np.exp(1j * phase_diffs), axis=0)) contours = ax.contourf(times, freqs, plv, 100) ax.set_ylim(0, 250) fig.subplots_adjust(right=0.8) fig.colorbar(contours, cax=fig.add_axes([0.85, 0.35, 0.01, 0.3])) fig.savefig(path, dpi=500) sys.stderr.write('saved plot to %s.\n' % path) def plot_wavelet_coherence(path): T = 400e-3 dt = 10e-5 A = 30 t = np.arange(0, T, dt) fig = plt.figure(figsize=(12, 12)) n_trials = 100 # ----------------- # with phase locking # ----------------- bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': 5, 'phase_shift': 1}] nX, nY = two_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps) sys.stderr.write('%d phase-locked trials' % n_trials) s_xx, s_yy, s_xy, times, freqs = wavelet_coherence(nX, nY, dt) times, freqs = np.meshgrid(1000 * times, freqs) # first 5 trials only ax = setup_axis(fig.add_subplot(2, 2, 1), 'time (ms)', 'frequency (Hz)') coh = (np.abs(np.mean(s_xy[:5], axis=0)) ** 2) / (np.mean(s_xx[:5], axis=0) * np.mean(s_yy[:5], axis=0)) ax.contourf(times, freqs, coh, 100) ax.set_ylim(0, 250) # all trials ax = setup_axis(fig.add_subplot(2, 2, 3), 'time (ms)', 'frequency (Hz)') coh = (np.abs(np.mean(s_xy, axis=0)) ** 2) / (np.mean(s_xx, axis=0) * np.mean(s_yy, axis=0)) ax.contourf(times, freqs, coh, 100) ax.set_ylim(0, 250) # ----------------- # without phase locking # ----------------- bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': 5}] nX, nY = two_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps) sys.stderr.write('%d non-phase-locked trials' % n_trials) s_xx, s_yy, s_xy, times, freqs = wavelet_coherence(nX, nY, dt) times, freqs = np.meshgrid(1000 * times, freqs) # first 5 trials only ax = setup_axis(fig.add_subplot(2, 2, 2), 'time (ms)', 'frequency (Hz)') coh = (np.abs(np.mean(s_xy[:5], axis=0)) ** 2) / (np.mean(s_xx[:5], axis=0) * np.mean(s_yy[:5], axis=0)) ax.contourf(times, freqs, coh, 100) ax.set_ylim(0, 250) # all trials ax = setup_axis(fig.add_subplot(2, 2, 4), 'time (ms)', 'frequency (Hz)') coh = (np.abs(np.mean(s_xy, axis=0)) ** 2) / (np.mean(s_xx, axis=0) * np.mean(s_yy, axis=0)) contours = ax.contourf(times, freqs, coh, 100) ax.set_ylim(0, 250) fig.subplots_adjust(right=0.8) fig.colorbar(contours, cax=fig.add_axes([0.85, 0.35, 0.01, 0.3])) fig.savefig(path, dpi=500) sys.stderr.write('saved plot to %s.\n' % path) def plot_example_resultant(path): def _plot_resultant(ax, n, phase_mean, phase_sd): vectors = np.exp(1j * (phase_mean + phase_sd * np.random.randn(n))) quiver_kw = { 'angles': 'xy', 'scale_units': 'xy', 'scale': 1 } ax.quiver(np.zeros(n), np.zeros(n), vectors.real, vectors.imag, width=5e-3, alpha=.2, color='k', **quiver_kw) resultant = np.mean(vectors) ax.quiver(np.zeros(1), np.zeros(1), resultant.real, resultant.imag, width=10e-3, alpha=.7, color='r', **quiver_kw) ax.set_aspect('equal') ax.set_xlim(-1.1, 1.1) ax.set_ylim(-1.1, 1.1) # draw the unit circle t = np.linspace(0, 2 * np.pi, 1000) ax.plot(np.cos(t), np.sin(t), c='b', lw=2, alpha=.2) phase_mean = np.pi / 4. n = 500 fig = plt.figure(figsize=(12, 6)) ax = setup_axis(fig.add_subplot(1, 2, 1), 'Re', 'Im') _plot_resultant(ax, n, phase_mean, .06 * np.pi) ax = setup_axis(fig.add_subplot(1, 2, 2), 'Re', 'Im') _plot_resultant(ax, n, phase_mean, .6 * np.pi) fig.savefig(path, dpi=300) sys.stderr.write('saved plot to %s.\n' % path) def plot_morlet_wavelets(path): fig = plt.figure(figsize=(12, 8)) ax_re = setup_axis(fig.add_subplot(2, 2, 1), 'time', 'Re\{$\psi(\\frac{t-\\tau}{s})$\}') ax_im = setup_axis(fig.add_subplot(2, 2, 2), 'time', 'Im\{$\psi(\\frac{t-\\tau}{s})$\}') t = np.linspace(-5, 5, 10000) def wavelet(t, s, tau): _t = (t - tau) / s return np.pi ** -.25 * np.exp(-_t**2 / 2) * np.exp(-6j * _t) w = wavelet(t, 1, 0) ax_re.plot(t, w.real, 'k', lw=1, alpha=.9) ax_im.plot(t, w.imag, 'k', lw=1, alpha=.9) w = wavelet(t, 2, 0) ax_re.plot(t, w.real, 'm', lw=4, alpha=.2) ax_im.plot(t, w.imag, 'm', lw=4, alpha=.2) ax_re = setup_axis(fig.add_subplot(2, 2, 3), 'time', 'Re\{$\psi(\\frac{t-\\tau}{s})$\}') ax_im = setup_axis(fig.add_subplot(2, 2, 4), 'time', 'Im\{$\psi(\\frac{t-\\tau}{s})$\}') w = wavelet(t, 1, 0) ax_re.plot(t, w.real, 'k', lw=1, alpha=.9) ax_im.plot(t, w.imag, 'k', lw=1, alpha=.9) w = wavelet(t, 1, 2) ax_re.plot(t, w.real, 'm', lw=4, alpha=.2) ax_im.plot(t, w.imag, 'm', lw=4, alpha=.2) fig.savefig(path, dpi=500) if __name__ == '__main__': plot_sample_lfp('lfp.png') plot_stationarity_measures('stationarity.png') plot_ar_stability_measure('AR-stability-index.png') plot_coherence_measures('coherence.png') plot_wavelet_plv('wavelet-plv.png') plot_wavelet_coherence('wavelet-coherence.png') plot_example_resultant('resultant.png') plot_morlet('morlet.png')