Skip to content

Instantly share code, notes, and snippets.

@cosacog
Last active March 28, 2019 11:32
Show Gist options
  • Save cosacog/6f31d26da2fc41c43ef76164772f5629 to your computer and use it in GitHub Desktop.
Save cosacog/6f31d26da2fc41c43ef76164772f5629 to your computer and use it in GitHub Desktop.
EDFを読み込んでbipolar montageのpowerspctrumを計算. 説明は下にあります。
import mne, os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
class SurfaceEmg:
def __init__(self, path_edf, path_ch_pairs, sec_window=5.0, minute_analysis=3):
# load edf
self._raw = mne.io.read_raw_edf(path_edf, preload=True)
# load channel pairs
self.ch_pairs = self._check_ch_pairs(path_ch_pairs)
self.n_pairs = self.ch_pairs.shape[0]
# self.calc_powspctrms() # set self.powspctrms_list, self.freqs, self.n_epochs
self.annotations = self._raw.annotations # .description, .onset etc
self.dict_kwd_t_onset = {}
self._minute_analysis = minute_analysis # minutes
self._sec_window = sec_window
print("Done.")
def calc_powspctrms(self):
'''
set powerspctram for all channel pair
'''
self.dict_powspctrms = {}
for kwd in self.dict_kwd_t_onset.keys():
print("Calculate powerspectrum based on tag:'{0}'...".format(kwd))
powspctrms_list = []
for ii in np.arange(self.n_pairs):
g1 = self.ch_pairs.iloc[ii, 0]
g2 = self.ch_pairs.iloc[ii, 1]
print("Calculating {0}-{1} pair...".format(g1, g2))
powspctrms, freqs = self._calc_powspctrms(G1=g1, G2=g2, kword=kwd,
t_window=self._sec_window, minute_analysis=self._minute_analysis)
powspctrms_list.append(powspctrms)
self.dict_powspctrms[kwd] = powspctrms_list
self.n_epochs = powspctrms.shape[0]
self.freqs = freqs
print("Done.")
def get_annots(self):
print(self.annotations.description)
def get_channel_pairs(self):
print(self.ch_pairs)
def plot_all_ch_powspctrms(self, kwd, xlim=[0, 50], log_transform=False):
'''
plot powerspectrams for all channel pair
kwd: search word. e.g.'start 3Hz'
'''
for ii in np.arange(self.n_pairs):
ttl_ch = ' '.join(list(self.ch_pairs.iloc[ii,:]))
ttl = str(ii) + '. ' + ttl_ch + ' :'+ kwd
self._plot_powspctrms(kwd, ii, log_transform, xlim=xlim, title=ttl)
def save_powspctrms(self, kwd ,dir_save, freq_lim=(0,50), log_transform=False):
'''
save powerspectrm
set kwd
'''
powspctrms = self.dict_powspctrms[kwd]
freqs = self.freqs[(self.freqs>=freq_lim[0]) & (self.freqs<=freq_lim[1])]
kwd_underscored = kwd.replace(' ', '_')
str_sec_window = str(self._sec_window)
ch_pairs = []
for ii in np.arange(self.n_pairs):
ch_pair = '_'.join(list(self.ch_pairs.iloc[ii,:]))
ch_pairs.append(ch_pair)
fname_save = "{0}_{1}_analysis{2}min_window{3}sec.csv".format(
ch_pair,
kwd_underscored,
self._minute_analysis,
str_sec_window.replace('.','_'))
powspctrms_seg = powspctrms[ii][:,(self.freqs >= freq_lim[0])&(self.freqs <= freq_lim[1])]
# save power spectrum epochs
self._save_powspctrms(powspctrms_seg, freqs, dir_save, fname_save, log_transform)
# save mean powerspectrum
if log_transform:
mean_powspctrm = np.mean(np.log10(powspctrms_seg), axis=0)
else:
mean_powspctrm = np.mean(powspctrms_seg, axis=0)
if ii == 0:
mean_powspctrms = mean_powspctrm
else:
mean_powspctrms = np.c_[mean_powspctrms, mean_powspctrm]
# save mean powerspctrum as dataframe
df = pd.DataFrame(mean_powspctrms, columns=ch_pairs, index=freqs)
fname_save_mean = "mean_{0}_analysis{1}min_window{2}sec.csv".format(
kwd_underscored,
self._minute_analysis,
str_sec_window.replace('.','_'))
path_save_mean = os.path.join(dir_save, fname_save_mean)
df.to_csv(path_save_mean)
print("Mean values csv was saved to {0}".format(path_save_mean))
def set_annot(self, kwd):
'''
set annotation description
get start time
'''
descripts = self.annotations.description
onset_times = self.annotations.onset
kwd_lower = kwd.lower().strip()
isNotFound = True
for idx, desc in enumerate(descripts):
if desc.lower().strip() == kwd_lower:
isNotFound = False
t_onset = onset_times[idx]
if kwd_lower in self.dict_kwd_t_onset.keys():
# already registered
return
self.dict_kwd_t_onset[kwd]=t_onset
print("Search word:'{0}' was found at {1} sec.".format(
kwd, t_onset
))
self.calc_powspctrms()
break
if isNotFound:
raise ValueError("Search Word:'{0}' was not found in annotations: \n{1}.\nPlease check the search word.".format(
kwd, '\n'.join([desc.strip() for desc in descripts])
))
def set_minute_analysis(self, minute):
'''
'''
self._minute_analysis = minute
if len(self.dict_kwd_t_onset)>0:
self.calc_powspctrms();
def set_sec_window(self, sec):
'''
'''
self._sec_window = sec
if len(self.dict_kwd_t_onset)>0:
self.calc_powspctrms();
def _check_ch_pairs(self, path_ch_pairs):
ch_pairs = pd.read_csv(path_ch_pairs)
# check columns info
if not all(ch_pairs.columns ==['G1','G2']):
raise ValueError("Msg From O: Header of channel pair csv file must be 'G1' and 'G2'.")
# check shape
if not ch_pairs.shape[1]==2:
raise ValueError("Msg From O: Channel pair values must be 2 columns.")
return ch_pairs
def _calc_powspctrms(self, G1='Fp1', G2='Fp2', kword='start 3Hz',
t_window=5, minute_analysis=3):
'''
calculate powerspctrm
'''
# get data,info
# self.set_annot(kword)
t_start = self.dict_kwd_t_onset[kword]
t_end = t_start + minute_analysis*60
idx_start, idx_end = self._raw.time_as_index([t_start, t_end])
raw_data = self._raw.get_data(start=idx_start, stop=idx_end)
ch_names = [x.upper() for x in self._raw.info['ch_names']]
sfreq = self._raw.info['sfreq']
# calc necessary info
p_count_seg = int(sfreq * t_window)
freqs = np.arange(0, sfreq/2, 1/t_window)
# channel info
idx_g1 = ch_names.index(G1.upper())
idx_g2 = ch_names.index(G2.upper())
# channel data
raw_data_g1 = raw_data[idx_g1,:]
raw_data_g2 = raw_data[idx_g2,:]
raw_data_1ch = raw_data_g1 - raw_data_g2
# hanning window
han_win = np.hanning(p_count_seg)
# segment data
float_segs = len(raw_data_1ch)/p_count_seg
n_segs = int(float_segs)
if float_segs > n_segs:
print("Data has a fraction not to be calculated because data point counts were not divided by time window length.")
raw_data_segs = raw_data_1ch[:int(n_segs*p_count_seg)].reshape((n_segs, p_count_seg))
# calc powspctrm
powspctrms = np.zeros((n_segs, p_count_seg))
for ii in np.arange(n_segs):
raw_data_seg = raw_data_segs[ii,:]
# demean
raw_data_seg_demean = raw_data_seg - np.mean(raw_data_seg)
# hanning window
raw_data_seg_win = raw_data_seg_demean * han_win
fft = np.fft.fft(raw_data_seg_win)
powspctrm = np.abs(fft)
powspctrms[ii,:] = powspctrm
return (powspctrms[:,:len(freqs)], freqs)
def _plot_powspctrms(self, kwd, idx, log_transform, xlim=[0, 50], title=None):
'''
plot powerspectrm
params:
idx: index for channel pair
xlim
title: if None (default), set by channel pair.
'''
try:
powspctrms = self.dict_powspctrms[kwd][idx]
except ValueError:
print("Calculate powerspectrum by self.calc_powspctrms() before plotting.")
return
if log_transform:
pows_plot = np.log10(powspctrms).T
else:
pows_plot = powspctrms.T
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(self.freqs, pows_plot)
ax.plot(self.freqs, np.mean(pows_plot, axis=1), linewidth=3, color='black')
ax.set_xlim((xlim))
if title is None:
ttl = ' '.join(list(self.ch_pairs.iloc[idx,:]))
else:
ttl = title
ax.set_title(ttl)
def _save_powspctrms(self, powspctrms, freqs, dir_save, fname_save, log_transform):
'''
save
'''
path_save = os.path.join(dir_save, fname_save)
pow_freqs = np.c_[freqs.reshape(len(freqs),1), powspctrms.T]
if log_transform:
pow_freqs_save = np.log10(pow_freqs)
else:
pow_freqs_save = pow_freqs
np.savetxt(path_save, pow_freqs_save, delimiter=',')
print("CSV data was saved to {0}.".format(path_save))
@cosacog
Copy link
Author

cosacog commented Dec 7, 2018

EDFを読み込み-> bipolar montageでpowerspectrum計算するスクリプト

0. 前提

0-1. モンタージュを示すcsvファイル

  • EDFファイルは当然ですが、その他にmontageのcsvが必要です。
    • 以下の表ような感じ. excelか何かで作って保存してください。
    • 'G1','G2'の行も必要です。
G1 G2
Fp1 Fp2
F3 F4
C3 C4
P3 P4

0-2. mneが使えること

  • 導入の仕方はこの辺 (mneのサイト)を参照ください。

1. 準備

  • このサイトのスクリプトを使えるようにします。
  • 普通にダウンロードしてインポートしてもよいですが、、、

1-1. スクリプトをwebから読み込む機能("import_gist")の追加

  • このサイトのスクリプトをダウンロードして読み込むためのモジュールを導入します。
  • "import_gist" てのはカスタムのモジュール(スクリプト)で、anaconda prompt上から
conda activate mne

でmneが使える環境になります。入力待ちの左が"(base)"から"(mne)"に変わります。

この状態で、

pip install git+https://github.com/cosacog/import_gist

で、pipを使って導入できます。
注意:たぶんpython3でしか走りません。
参照: githubで"cosacog/import_gist"を検索してください。あまり有用な情報はないですが、こんなもんだという雰囲気は感じられると思います。

1-2. pythonの起動

anaconda prompt上で、入力待ちが"(base)"だったら

conda activate mne

で、入力待ちが"(mne)"になってることを確認します。何か作業して既に"(mne)"になっていれば不要です。

次に

ipython --matplotlib

とタイプすると、
ipython

と入力待ちが緑色で"[1]:"みたいになります。

1-3. ipythonでの作業

  • ipython上で以下入力します。
  • メモ帳か、できれ何かエディタ(例えばnotepad++:無料)で先に書いてコピペした方がエラーがでた時に修正しやすいです。またエディタは色がつくので見やすかったり、入力支援があるので、先々で役に立ちます。
  • 初心者のうちは1行ずつコピペしてエラーが出ないか確認しましょう。
from import_gist import *
url_gist = 'https://gist.github.com/cosacog/6f31d26da2fc41c43ef76164772f5629' # このgistのurl
mdl = import_gist(url_gist) 

# ここからファイル読み込み
path_edf = '/path/to/image.edf' # 読み込むEDFファイルのパスです。適当に書き換えてください
path_csv = '/path/to/montage.csv' # 読み込むモンタージュのcsv (0.前提参照)のパスです。適当に書き換えてください。
dir_save = '/dir/to/save_results.csv # 解析結果を保存するディレクトリ。適当に書き換えてください。
emg_data = mdl.SurfaceEmg(path_edf, path_csv)

で準備完了です。ファイル読み込みに時間がかかったり、いろいろメッセージが出ます。
エラーが出たら以下を参照。

2. エラーが出るときはファイルのパスが違ってることが多いです

  • path_edf = ''の中身、path_csv = '' の中身以外はコピペで使えます。なので、エラーが出たらここの問題が非常に考えやすいです。
  • "FileNotFoundError: [Errno 2] No such file or directory: 云々"と出たら間違いありません。
  • 確認のため以下をタイプして、"False"と出たら、ファイルが"ない"ということです。
  • それぞれpath_edf, path_csvのパスの記載が間違っていることになります。
import os
os.path.exists(path_edf)
os.path.exists(path_csv)
  • 上記 "False"と出たら、、、
    • 大文字、小文字の区別とかいかがでしょうか。
    • 全角、半角は間違ってないでしょうか。
    • パス区切り文字のうち、"\"(バックスラッシュ)はWindowsのいわゆるエクスプローラ上でしかうまく認識されません。"/"(スラッシュ)に書き換えます。
    • パスに スペース が入っているとトラブルの元です。スペースのないディレクトリに移して設定し直しましょう。
    • パスに 日本語 が入っているとトラブルの元です。アルファベットのディレクトリに移して設定し直しましょう。
    • パスに "-"(ハイフン) が入っているとトラブる"かも"知れません。"_"(アンダーバー、アンダースコア)の方が問題が少ないです。

3. 解析:以下すべてipython上で

kwd = 'start 3Hz' #とか
# 検索ワードをセット+power spectrum計算
emg_data.set_annot(kwd) # kwdをファイルから探します。見つからなかったらアラートを出します。
# 結果のプロット
emg_data.plot_all_ch_powspctrms(kwd, xlim=(0, 20)) # log変換したい時は、後に"log_transform=True"を追加
# 結果の保存
emg_data.save_powspctrms(kwd, dir_save, freq_lim=(0, 50)) # log変換したい時は、後に"log_transform=True"を追加
  • 各チャンネルのパワースペクトラムがプロットされると成功です。
  • dir_saveで指定したディレクトリに結果のcsvが保存されます。

4. 解析時間の変更方法

デフォルトでは3分間切り取って、5秒を1区間として(=180/5=36エポック)の解析をするようにしてます。
もし3分間の解析時間を変更したい時は、3のスクリプトを走らせる前に

emg_data.set_minute_analysis(2) # 2分の場合

5秒1区間の時間を変えたい場合

emg_data.set_sec_window(10.0) # 10秒の場合

を挿入してください。
10秒の時間幅で区切って解析すると、結果は1/10(=0.1) Hz刻み、5秒の時間幅だと1/5(=0.2)Hz刻み、てな感じになります。

5. 未解決問題

  • pipでgithubのモジュールをインポートしてもspyderで反映されない.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment