Skip to content

Instantly share code, notes, and snippets.

@dtxe
Last active November 20, 2020 15:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dtxe/a74bd43f4106281b86b9171bf5efe73d to your computer and use it in GitHub Desktop.
Save dtxe/a74bd43f4106281b86b9171bf5efe73d to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Dec 5 11:40:07 2019
@author: simeonwong
"""
import pyedflib
import os.path
import numpy as np
import datetime
import json
import pandas as pd
from PyQt5.QtWidgets import QFileDialog, QApplication
app = QApplication(['scalpextractor'])
meta_path = QFileDialog.getOpenFileName(None, "Select an EEG meta file",
"/d/gmi/1/rawdata/",
"Meta JSON file (*.meta.json)")
meta_path = meta_path[0]
###############################################################################
#%% Parameters
electrode_subset = []
# electrode_type_subset = ['EEG', 'EMG', 'ECG', 'Misc'] # scalp
electrode_type_subset = ['SEEG', 'iEEG', 'EEG', 'EMG', 'ECG', 'Misc', 'TRIG']
start_time = datetime.time(22,00,00)
end_time = datetime.time(8,00,00)
print('Loading input file - %s' % meta_path)
#%% load metadata
(path_dir, path_file) = os.path.split(meta_path)
meta_f = open(meta_path, 'r')
meta = json.load(meta_f)
meta_f.close()
if 'channel_labels' in meta.keys():
labels = pd.read_csv(os.path.join(path_dir, meta['channel_labels']))
labels_dict = dict(zip(labels.Pinbox, labels.Label))
else:
labels = None
f = pyedflib.EdfReader(os.path.join(path_dir, meta['filename']), 0, 0)
print('Opened input file...')
# get info from original file
edf_electrodes = f.getSignalLabels()
for key, value in labels_dict.items():
try:
edf_electrodes[edf_electrodes.index(key)] = value
except:
print('%s -> %s not found' % (key, value))
# if no subset specified, take all electrodes
if len(electrode_subset) == 0:
electrode_subset = list(labels.Label)
# filter electrodes by type
if len(electrode_type_subset) != 0:
electrode_subset = pd.Series(electrode_subset, name='Label')
electrode_subset = pd.merge(electrode_subset, labels, how='left')
electrode_subset = electrode_subset[electrode_subset['Type'].map(lambda x: x in electrode_type_subset)]
electrode_subset = electrode_subset['Label'].to_list()
n_chan = len(electrode_subset)
# convert text labels to idx
scalp_electrode_idx = list(map(lambda x : edf_electrodes.index(x), electrode_subset))
# get info from original file
srate = f.getSampleFrequency(scalp_electrode_idx[0])
edf_starttime = f.getStartdatetime()
#%% copy header info
in_path, in_file = os.path.split(os.path.join(path_dir, meta['filename']))
in_file, _ = os.path.splitext(in_file)
suggested_output_name = os.path.join(in_path, in_file + "_scalpeeg.EDF")
# confirm output file
output_dataset = QFileDialog.getSaveFileName(None, "Output EDF filename", suggested_output_name, "EDF file (*.EDF)")
output_dataset = output_dataset[0]
o = pyedflib.EdfWriter(output_dataset, n_chan)
print('Create output file - %s' % output_dataset)
# signal headers
o.setStartdatetime(datetime.datetime.combine(edf_starttime.date(), start_time))
o.setSignalHeaders([f.getSignalHeaders()[i] for i in scalp_electrode_idx])
for eidx in range(len(electrode_subset)):
o.setLabel(eidx, electrode_subset[eidx])
o.update_header()
print('Headers written...')
#%% copy signals
# compute start and end samples
days_delta = 1 if datetime.datetime.combine(edf_starttime.date(), end_time) <= edf_starttime else 0
start_sample = (datetime.datetime.combine(edf_starttime.date(), start_time) - edf_starttime).total_seconds() * srate
end_sample = (datetime.datetime.combine(edf_starttime.date()+datetime.timedelta(days=days_delta), end_time) - edf_starttime).total_seconds() * srate
nblocks = np.round((end_sample - start_sample) / srate).astype('int32')
sigbuf = np.zeros((n_chan, srate), dtype=np.int32)
for kk in range(nblocks):
c_start = start_sample + (kk*srate)
for cc in range(n_chan):
f.read_digital_signal(scalp_electrode_idx[cc], c_start, srate, sigbuf[cc,:])
o.writeSamples(sigbuf, digital=True)
o.update_header()
#%% clean up
o.close()
print('All done!')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment