Skip to content

Instantly share code, notes, and snippets.

@dongqunxi
Forked from agramfort/pdc_dtf.py
Last active August 29, 2015 14:12
Show Gist options
  • Save dongqunxi/c3f9273b0ef19c5c7d38 to your computer and use it in GitHub Desktop.
Save dongqunxi/c3f9273b0ef19c5c7d38 to your computer and use it in GitHub Desktop.
Try to realize the model order estimation across trials
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Implements Partial Directed Coherence and Direct Transfer Function
using multi-trials' MVAR processes.
Reference
---------
[1] Luiz A. Baccala and Koichi Sameshima. Partial directed coherence:
a new concept in neural structure determination.
Biological Cybernetics, 84(6):463:474, 2001.
[2] Mingzhou Ding, Yonghong Chen. Granger Causality: Basic Theory and Application
to Neuroscience.Elsevier Science, 7 February 2008.
"""
# Authors: Alexandre Gramfort <alexandre.gramfort@telecom-paristech.fr>
# Dong Qunxi <dongqunxi@gmail.com>
# License: BSD (3-clause)
import math
import numpy as np
from scipy import linalg, fftpack
import matplotlib.pyplot as plt
def mvar_generate(A, n, sigma, burnin=500):
"""Simulate MVAR process
Parameters
----------
A : ndarray, shape (p, N, N)
The AR coefficients where N is the number of signals
and p the order of the model.
n : int
The number of time samples.
sigma : array, shape (N,)
The noise for each time series
burnin : int
The length of the burnin period (in samples).
Returns
-------
X : ndarray, shape (N, n)
The N time series of length n
"""
p, N, N = A.shape
A_2d = np.concatenate(A, axis=1)
Y = np.zeros((n + burnin, N))
sigma = np.diag(sigma)
mu = np.zeros(N)
# itération du processus
for i in range(p, n):
w = np.random.multivariate_normal(mu, sigma)
Y[i] = np.dot(A_2d, Y[i - p:i][::-1, :].ravel()) + w
return Y[burnin:].T
def normalize_data(data, factor):
"""
1)Decrease sampling rate by integer factor n with included offset phase.
2)Decrease the linear trend of data across trials
3)Reject the temporal mean of data
4)Reject the ensemble mean of data
Parameters
----------
data: array-like, shape=(channels, points, trials)
Representative STCs for each ROI.
factor: int
Factor for down sampling.
Return
-------
C : array
Normalized data
"""
#Downsampling signals based on factor
from scipy import signal
data = signal.decimate(x=data, q=factor, axis=1)
#Detrend the signals
channels, points, trials= data.shape
pdeg = 1
pdeg = pdeg * np.ones(channels)
x = np.arange(points) + 1
mu = np.mean(x)
sig = np.std(x,ddof=1)
x = (x - mu)/sig
P = np.zeros((channels,points))
p = []
for i in xrange(channels):
d = pdeg[i]
d1 = int(d + 1)
V = np.zeros((d1,points))
V[d1-1, :] = np.ones(points)
j = int(d)
while j > 0:
V[j-1, :] = x * V[j, :]
j = j - 1
temp = np.mean(data[i, :, :], axis=-1)
p.append(np.linalg.lstsq(V.T, temp)[0].T)
P[i, :] = p[i][0] * np.ones((1,points))
for j in range(1, d1):
P[i, :] = x * P[i, :] + p[i][j]
Y = np.zeros(data.shape)
for r in xrange(trials):
Y[:, :, r] = data[:, :, r] - P
#Reject the temporal mean of data
C = Y.transpose(1, 0, 2)
points, channels, trials = C.shape
for i in xrange(channels):
b = C[:, i, :]
c = np.mean(b, axis=0)
# Reject the temporal mean within each trial
for j in xrange(points):
C[j, i, :] = C[j, i, :] - c
#Reject the ensemble mean of data
for i in xrange(channels):
b = C[:, i, :]
d = np.mean(b, axis=-1)
# Reject the ensemble mean across trails
for j in xrange(trials):
C[:, i, j] = C[:, i, j] - d
C = C.transpose(1, 0, 2)
return C
def cov(X, p):
"""vector autocovariance up to order p
Parameters
----------
X : ndarray, shape (N, n)
The N time series of length n
Returns
-------
R : ndarray, shape (p + 1, N, N)
The autocovariance up to order p
"""
N, n = X.shape
R = np.zeros((p + 1, N, N))
for k in range(p + 1):
R[k] = (1. / float(n - k)) * np.dot(X[:, :n - k], X[:, k:].T)
return R
def mvar_fit(X, p):
"""Fit MVAR model of order p using Yule Walker
Parameters
----------
X : ndarray, shape (N, n, m)
The first index references variables, the second observations
and the third trials.
p : int
The candidate of model order.
Returns
-------
A : ndarray, shape (p, N, N)
The AR coefficients where N is the number of signals
and p the order of the model.
sigma : array, shape (N,)
The noise for each time series
"""
N, n, m = X.shape
gammas = np.zeros((m,p+1,N,N))
#import pdb
#pdb.set_trace()
for i in xrange(m):
gammas[i] = cov(X[:,:,i], p)
gamma = gammas.mean(axis=0)
G = np.zeros((p * N, p * N))
gamma2 = np.concatenate(gamma, axis=0)
gamma2[:N, :N] /= 2.
for i in range(p):
G[N * i:, N * i:N * (i + 1)] = gamma2[:N * (p - i)]
G = G + G.T # big block matrix
gamma4 = np.concatenate(gamma[1:], axis=0)
phi = linalg.solve(G, gamma4) # solve Yule Walker
tmp = np.dot(gamma4[:N * p].T, phi)
sigma = gamma[0] - tmp - tmp.T + np.dot(phi.T, np.dot(G, phi))
phi = np.reshape(phi, (p, N, N))
for k in range(p):
phi[k] = phi[k].T
return phi, sigma
def compute_order(X, p_max):
"""Estimate AR order with BIC
Parameters
----------
X : ndarray, shape (N, n, m)
The first index references variables, the second observations
and the third trials
p_max : int
The maximum model order to test
Reference
---------
[2] provides the equation:BIC(m) = 2*log[det(Σ)]+ 2*(N**2)*p*log(n * m)/(n * m),
Σ is the noise covariance matrix, N is the variable, p is the model order, n
is the trial length, m is the trials.
Returns
-------
p : int
Estimated order
bic : ndarray, shape (p_max + 1,)
The BIC for the orders from 0 to p_max.
"""
N, n, m = X.shape
bic = np.empty(p_max + 1)
bic[0] = np.inf # XXX
#Y = X.T
for p in range(1, p_max + 1):
print p
A, sigma = mvar_fit(X, p)
bic[p] = 2 * np.log(linalg.det(sigma))
bic[p] += 2 * p * (N ** 2) * np.log(n * m) / (n * m)
p = np.argmin(bic)
return p, bic
def spectral_density(A, n_fft=None):
"""Estimate PSD from AR coefficients
Parameters
----------
A : ndarray, shape (p, N, N)
The AR coefficients where N is the number of signals
and p the order of the model.
n_fft : int
The length of the FFT
Returns
-------
fA : ndarray, shape (n_fft, N, N)
The estimated spectral density.
"""
p, N, N = A.shape
if n_fft is None:
n_fft = max(int(2 ** math.ceil(np.log2(p))), 512)
A2 = np.zeros((n_fft, N, N))
A2[1:p + 1, :, :] = A # start at 1 !
fA = fftpack.fft(A2, axis=0)
freqs = fftpack.fftfreq(n_fft)
I = np.eye(N)
for i in range(n_fft):
fA[i] = linalg.inv(I - fA[i])
return fA, freqs
def DTF(A, sigma=None, n_fft=None):
"""Direct Transfer Function (DTF)
Parameters
----------
A : ndarray, shape (p, N, N)
The AR coefficients where N is the number of signals
and p the order of the model.
sigma : array, shape (N, )
The noise for each time series
n_fft : int
The length of the FFT
Returns
-------
D : ndarray, shape (n_fft, N, N)
The estimated DTF
"""
p, N, N = A.shape
if n_fft is None:
n_fft = max(int(2 ** math.ceil(np.log2(p))), 512)
H, freqs = spectral_density(A, n_fft)
D = np.zeros((n_fft, N, N))
if sigma is None:
sigma = np.ones(N)
for i in range(n_fft):
S = H[i]
V = (S * sigma[None, :]).dot(S.T.conj())
V = np.abs(np.diag(V))
D[i] = np.abs(S * np.sqrt(sigma[None, :])) / np.sqrt(V)[:, None]
return D, freqs
def PDC(A, sigma=None, n_fft=None):
"""Partial directed coherence (PDC)
Parameters
----------
A : ndarray, shape (p, N, N)
The AR coefficients where N is the number of signals
and p the order of the model.
sigma : array, shape (N,)
The noise for each time series.
n_fft : int
The length of the FFT.
Returns
-------
P : ndarray, shape (n_fft, N, N)
The estimated PDC.
"""
p, N, N = A.shape
if n_fft is None:
n_fft = max(int(2 ** math.ceil(np.log2(p))), 512)
H, freqs = spectral_density(A, n_fft)
P = np.zeros((n_fft, N, N))
if sigma is None:
sigma = np.ones(N)
for i in range(n_fft):
B = H[i]
B = linalg.inv(B)
V = np.abs(np.dot(B.T.conj(), B * (1. / sigma[:, None])))
V = np.diag(V) # denominator squared
P[i] = np.abs(B * (1. / np.sqrt(sigma))[None, :]) / np.sqrt(V)[None, :]
return P, freqs
def plot_all(freqs, P, name):
"""Plot grid of subplots
"""
m, N, N = P.shape
pos_freqs = freqs[freqs >= 0]
f, axes = plt.subplots(N, N)
for i in range(N):
for j in range(N):
axes[i, j].fill_between(pos_freqs, P[freqs >= 0, i, j], 0)
axes[i, j].set_xlim([0, np.max(pos_freqs)])
axes[i, j].set_ylim([0, 1])
plt.suptitle(name)
plt.tight_layout()
if __name__ == '__main__':
plt.close('all')
# example from the paper
A = np.zeros((3, 5, 5))
A[0, 0, 0] = 0.95 * math.sqrt(2)
A[1, 0, 0] = -0.9025
A[1, 1, 0] = 0.5
A[2, 2, 0] = -0.4
A[1, 3, 0] = -0.5
A[0, 3, 3] = 0.25 * math.sqrt(2)
A[0, 3, 4] = 0.25 * math.sqrt(2)
A[0, 4, 3] = -0.25 * math.sqrt(2)
A[0, 4, 4] = 0.25 * math.sqrt(2)
# simulate processes
n = 10 ** 3
# sigma = np.array([0.0001, 1, 1, 1, 1])
# sigma = np.array([0.01, 1, 1, 1, 1])
sigma = np.array([1., 1., 1., 1., 1.])
trials = 50
Y = np.zeros((5,n,trials))
for i in xrange(trials):
Y[:, :, i] = mvar_generate(A, n, sigma)
#import pdb
#pdb.set_trace()
X = normalize_data(Y, factor=1)
# estimate AR order with BIC
if 1:
p_max = 20
p, bic = compute_order(X, p_max=p_max)
plt.figure()
plt.plot(np.arange(p_max + 1), bic)
plt.xlabel('order')
plt.ylabel('BIC')
plt.show()
else:
p = 3
A_est, sigma = mvar_fit(X, p)
sigma = np.diag(sigma) # DTF + PDC support diagonal noise
# sigma = None
# compute DTF
D, freqs = DTF(A_est, sigma)
plot_all(freqs, D, 'DTF')
# compute PDC
P, freqs = PDC(A_est, sigma)
plot_all(freqs, P, 'PDC')
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment