Last active
March 28, 2019 11:32
-
-
Save cosacog/6f31d26da2fc41c43ef76164772f5629 to your computer and use it in GitHub Desktop.
EDFを読み込んでbipolar montageのpowerspctrumを計算. 説明は下にあります。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mne, os | |
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
class SurfaceEmg: | |
def __init__(self, path_edf, path_ch_pairs, sec_window=5.0, minute_analysis=3): | |
# load edf | |
self._raw = mne.io.read_raw_edf(path_edf, preload=True) | |
# load channel pairs | |
self.ch_pairs = self._check_ch_pairs(path_ch_pairs) | |
self.n_pairs = self.ch_pairs.shape[0] | |
# self.calc_powspctrms() # set self.powspctrms_list, self.freqs, self.n_epochs | |
self.annotations = self._raw.annotations # .description, .onset etc | |
self.dict_kwd_t_onset = {} | |
self._minute_analysis = minute_analysis # minutes | |
self._sec_window = sec_window | |
print("Done.") | |
def calc_powspctrms(self): | |
''' | |
set powerspctram for all channel pair | |
''' | |
self.dict_powspctrms = {} | |
for kwd in self.dict_kwd_t_onset.keys(): | |
print("Calculate powerspectrum based on tag:'{0}'...".format(kwd)) | |
powspctrms_list = [] | |
for ii in np.arange(self.n_pairs): | |
g1 = self.ch_pairs.iloc[ii, 0] | |
g2 = self.ch_pairs.iloc[ii, 1] | |
print("Calculating {0}-{1} pair...".format(g1, g2)) | |
powspctrms, freqs = self._calc_powspctrms(G1=g1, G2=g2, kword=kwd, | |
t_window=self._sec_window, minute_analysis=self._minute_analysis) | |
powspctrms_list.append(powspctrms) | |
self.dict_powspctrms[kwd] = powspctrms_list | |
self.n_epochs = powspctrms.shape[0] | |
self.freqs = freqs | |
print("Done.") | |
def get_annots(self): | |
print(self.annotations.description) | |
def get_channel_pairs(self): | |
print(self.ch_pairs) | |
def plot_all_ch_powspctrms(self, kwd, xlim=[0, 50], log_transform=False): | |
''' | |
plot powerspectrams for all channel pair | |
kwd: search word. e.g.'start 3Hz' | |
''' | |
for ii in np.arange(self.n_pairs): | |
ttl_ch = ' '.join(list(self.ch_pairs.iloc[ii,:])) | |
ttl = str(ii) + '. ' + ttl_ch + ' :'+ kwd | |
self._plot_powspctrms(kwd, ii, log_transform, xlim=xlim, title=ttl) | |
def save_powspctrms(self, kwd ,dir_save, freq_lim=(0,50), log_transform=False): | |
''' | |
save powerspectrm | |
set kwd | |
''' | |
powspctrms = self.dict_powspctrms[kwd] | |
freqs = self.freqs[(self.freqs>=freq_lim[0]) & (self.freqs<=freq_lim[1])] | |
kwd_underscored = kwd.replace(' ', '_') | |
str_sec_window = str(self._sec_window) | |
ch_pairs = [] | |
for ii in np.arange(self.n_pairs): | |
ch_pair = '_'.join(list(self.ch_pairs.iloc[ii,:])) | |
ch_pairs.append(ch_pair) | |
fname_save = "{0}_{1}_analysis{2}min_window{3}sec.csv".format( | |
ch_pair, | |
kwd_underscored, | |
self._minute_analysis, | |
str_sec_window.replace('.','_')) | |
powspctrms_seg = powspctrms[ii][:,(self.freqs >= freq_lim[0])&(self.freqs <= freq_lim[1])] | |
# save power spectrum epochs | |
self._save_powspctrms(powspctrms_seg, freqs, dir_save, fname_save, log_transform) | |
# save mean powerspectrum | |
if log_transform: | |
mean_powspctrm = np.mean(np.log10(powspctrms_seg), axis=0) | |
else: | |
mean_powspctrm = np.mean(powspctrms_seg, axis=0) | |
if ii == 0: | |
mean_powspctrms = mean_powspctrm | |
else: | |
mean_powspctrms = np.c_[mean_powspctrms, mean_powspctrm] | |
# save mean powerspctrum as dataframe | |
df = pd.DataFrame(mean_powspctrms, columns=ch_pairs, index=freqs) | |
fname_save_mean = "mean_{0}_analysis{1}min_window{2}sec.csv".format( | |
kwd_underscored, | |
self._minute_analysis, | |
str_sec_window.replace('.','_')) | |
path_save_mean = os.path.join(dir_save, fname_save_mean) | |
df.to_csv(path_save_mean) | |
print("Mean values csv was saved to {0}".format(path_save_mean)) | |
def set_annot(self, kwd): | |
''' | |
set annotation description | |
get start time | |
''' | |
descripts = self.annotations.description | |
onset_times = self.annotations.onset | |
kwd_lower = kwd.lower().strip() | |
isNotFound = True | |
for idx, desc in enumerate(descripts): | |
if desc.lower().strip() == kwd_lower: | |
isNotFound = False | |
t_onset = onset_times[idx] | |
if kwd_lower in self.dict_kwd_t_onset.keys(): | |
# already registered | |
return | |
self.dict_kwd_t_onset[kwd]=t_onset | |
print("Search word:'{0}' was found at {1} sec.".format( | |
kwd, t_onset | |
)) | |
self.calc_powspctrms() | |
break | |
if isNotFound: | |
raise ValueError("Search Word:'{0}' was not found in annotations: \n{1}.\nPlease check the search word.".format( | |
kwd, '\n'.join([desc.strip() for desc in descripts]) | |
)) | |
def set_minute_analysis(self, minute): | |
''' | |
''' | |
self._minute_analysis = minute | |
if len(self.dict_kwd_t_onset)>0: | |
self.calc_powspctrms(); | |
def set_sec_window(self, sec): | |
''' | |
''' | |
self._sec_window = sec | |
if len(self.dict_kwd_t_onset)>0: | |
self.calc_powspctrms(); | |
def _check_ch_pairs(self, path_ch_pairs): | |
ch_pairs = pd.read_csv(path_ch_pairs) | |
# check columns info | |
if not all(ch_pairs.columns ==['G1','G2']): | |
raise ValueError("Msg From O: Header of channel pair csv file must be 'G1' and 'G2'.") | |
# check shape | |
if not ch_pairs.shape[1]==2: | |
raise ValueError("Msg From O: Channel pair values must be 2 columns.") | |
return ch_pairs | |
def _calc_powspctrms(self, G1='Fp1', G2='Fp2', kword='start 3Hz', | |
t_window=5, minute_analysis=3): | |
''' | |
calculate powerspctrm | |
''' | |
# get data,info | |
# self.set_annot(kword) | |
t_start = self.dict_kwd_t_onset[kword] | |
t_end = t_start + minute_analysis*60 | |
idx_start, idx_end = self._raw.time_as_index([t_start, t_end]) | |
raw_data = self._raw.get_data(start=idx_start, stop=idx_end) | |
ch_names = [x.upper() for x in self._raw.info['ch_names']] | |
sfreq = self._raw.info['sfreq'] | |
# calc necessary info | |
p_count_seg = int(sfreq * t_window) | |
freqs = np.arange(0, sfreq/2, 1/t_window) | |
# channel info | |
idx_g1 = ch_names.index(G1.upper()) | |
idx_g2 = ch_names.index(G2.upper()) | |
# channel data | |
raw_data_g1 = raw_data[idx_g1,:] | |
raw_data_g2 = raw_data[idx_g2,:] | |
raw_data_1ch = raw_data_g1 - raw_data_g2 | |
# hanning window | |
han_win = np.hanning(p_count_seg) | |
# segment data | |
float_segs = len(raw_data_1ch)/p_count_seg | |
n_segs = int(float_segs) | |
if float_segs > n_segs: | |
print("Data has a fraction not to be calculated because data point counts were not divided by time window length.") | |
raw_data_segs = raw_data_1ch[:int(n_segs*p_count_seg)].reshape((n_segs, p_count_seg)) | |
# calc powspctrm | |
powspctrms = np.zeros((n_segs, p_count_seg)) | |
for ii in np.arange(n_segs): | |
raw_data_seg = raw_data_segs[ii,:] | |
# demean | |
raw_data_seg_demean = raw_data_seg - np.mean(raw_data_seg) | |
# hanning window | |
raw_data_seg_win = raw_data_seg_demean * han_win | |
fft = np.fft.fft(raw_data_seg_win) | |
powspctrm = np.abs(fft) | |
powspctrms[ii,:] = powspctrm | |
return (powspctrms[:,:len(freqs)], freqs) | |
def _plot_powspctrms(self, kwd, idx, log_transform, xlim=[0, 50], title=None): | |
''' | |
plot powerspectrm | |
params: | |
idx: index for channel pair | |
xlim | |
title: if None (default), set by channel pair. | |
''' | |
try: | |
powspctrms = self.dict_powspctrms[kwd][idx] | |
except ValueError: | |
print("Calculate powerspectrum by self.calc_powspctrms() before plotting.") | |
return | |
if log_transform: | |
pows_plot = np.log10(powspctrms).T | |
else: | |
pows_plot = powspctrms.T | |
fig = plt.figure() | |
ax = fig.add_subplot(111) | |
ax.plot(self.freqs, pows_plot) | |
ax.plot(self.freqs, np.mean(pows_plot, axis=1), linewidth=3, color='black') | |
ax.set_xlim((xlim)) | |
if title is None: | |
ttl = ' '.join(list(self.ch_pairs.iloc[idx,:])) | |
else: | |
ttl = title | |
ax.set_title(ttl) | |
def _save_powspctrms(self, powspctrms, freqs, dir_save, fname_save, log_transform): | |
''' | |
save | |
''' | |
path_save = os.path.join(dir_save, fname_save) | |
pow_freqs = np.c_[freqs.reshape(len(freqs),1), powspctrms.T] | |
if log_transform: | |
pow_freqs_save = np.log10(pow_freqs) | |
else: | |
pow_freqs_save = pow_freqs | |
np.savetxt(path_save, pow_freqs_save, delimiter=',') | |
print("CSV data was saved to {0}.".format(path_save)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
EDFを読み込み-> bipolar montageでpowerspectrum計算するスクリプト
0. 前提
0-1. モンタージュを示すcsvファイル
0-2. mneが使えること
1. 準備
1-1. スクリプトをwebから読み込む機能("import_gist")の追加
でmneが使える環境になります。入力待ちの左が"(base)"から"(mne)"に変わります。
この状態で、
で、pipを使って導入できます。
注意:たぶんpython3でしか走りません。
参照: githubで"cosacog/import_gist"を検索してください。あまり有用な情報はないですが、こんなもんだという雰囲気は感じられると思います。
1-2. pythonの起動
anaconda prompt上で、入力待ちが"(base)"だったら
で、入力待ちが"(mne)"になってることを確認します。何か作業して既に"(mne)"になっていれば不要です。
次に
とタイプすると、
と入力待ちが緑色で"[1]:"みたいになります。
1-3. ipythonでの作業
で準備完了です。ファイル読み込みに時間がかかったり、いろいろメッセージが出ます。
エラーが出たら以下を参照。
2. エラーが出るときはファイルのパスが違ってることが多いです
3. 解析:以下すべてipython上で
4. 解析時間の変更方法
デフォルトでは3分間切り取って、5秒を1区間として(=180/5=36エポック)の解析をするようにしてます。
もし3分間の解析時間を変更したい時は、3のスクリプトを走らせる前に
5秒1区間の時間を変えたい場合
を挿入してください。
10秒の時間幅で区切って解析すると、結果は1/10(=0.1) Hz刻み、5秒の時間幅だと1/5(=0.2)Hz刻み、てな感じになります。
5. 未解決問題