Skip to content

Instantly share code, notes, and snippets.

@miki998
Created January 13, 2022 09:48
Show Gist options
  • Save miki998/0d5140fc5aaf09befc36fca3cc2320b7 to your computer and use it in GitHub Desktop.
Save miki998/0d5140fc5aaf09befc36fca3cc2320b7 to your computer and use it in GitHub Desktop.
import pyxdf
import numpy as np
import os
import argparse
import csv
import pandas as pd
import shutil
# Script that takes a XDF file and converts all the data to CSV files.
def eyes(stream, output_path):
time_series = np.array(stream["time_series"])
time_stamps = np.array(stream["time_stamps"])
with open(os.path.join(output_path, 'eyes.csv'), 'w', newline='') as file_:
writer = csv.writer(file_, delimiter=";")
writer.writerow(["left screen x",
"left screen y",
"left pupil",
"left gaze origin ucs x",
"left gaze origin ucs y",
"left gaze origin ucs z",
"left gaze position ucs x",
"left gaze position ucs y",
"left gaze position ucs z",
"left gaze origin validity",
"left gaze position validity",
"right screen x",
"right screen y",
"right pupil",
"right gaze origin ucs x",
"right gaze origin ucs y",
"right gaze origin ucs z",
"right gaze position ucs x",
"right gaze position ucs y",
"right gaze position ucs z",
"right gaze origin validity",
"right gaze position validity",
"device timestamp",
"system timestamp",
"LSL timestamp"])
for i in range(time_series.shape[0]):
writer.writerow([time_series[i,0],
time_series[i,1],
time_series[i,2],
time_series[i,19],
time_series[i,20],
time_series[i,21],
time_series[i,13],
time_series[i,14],
time_series[i,15],
time_series[i,9],
time_series[i,7],
time_series[i,3],
time_series[i,4],
time_series[i,5],
time_series[i,16],
time_series[i,17],
time_series[i,18],
time_series[i,10],
time_series[i,11],
time_series[i,12],
time_series[i,8],
time_series[i,6],
time_series[i,22],
time_series[i,23],
time_stamps[i]])
def eeg(stream, output_path):
sampling_rate = stream["info"]["effective_srate"]
time_series = np.array(stream["time_series"])
time_stamps = np.array(stream["time_stamps"])
with open(os.path.join(output_path, 'eeg.csv'), 'w', newline='') as file_:
writer = csv.writer(file_, delimiter=";")
writer.writerow(["Channel1", "Channel2", "Channel3", "Channel4", "Channel5", "Channel6", "Channel7", "Channel8", "Device timestamp", "LSL timestamp"])
for i in range(time_series.shape[0]):
writer.writerow([time_series[i,0], time_series[i,1], time_series[i,2], time_series[i,3], time_series[i,4], time_series[i,5], time_series[i,6], time_series[i,7], time_series[i,8], time_stamps[i]])
return sampling_rate
def eeg_device(stream, output_path, sampling_rate):
time_series = np.array(stream["time_series"])
time_stamps = np.array(stream["time_stamps"])
with open(os.path.join(output_path, 'eeg_device.csv'), 'w', newline='') as file_:
writer = csv.writer(file_, delimiter=";")
writer.writerow(["Number channels", "Frequency", "Gain", "Version", "Release", "Product", "MAC", "Effective Sampling Rate"])
# There is only need to save one of those because they are all the same
writer.writerow([time_series[0,0], time_series[0,1], time_series[0,2], time_series[0,3], time_series[0,4], time_series[0,5], time_series[0,6], sampling_rate])
def ecg(stream, output_path):
time_series = np.array(stream["time_series"])
time_stamps = np.array(stream["time_stamps"])
with open(os.path.join(output_path, 'ecg.csv'), 'w', newline='') as file_:
writer = csv.writer(file_, delimiter=";")
writer.writerow(["Channel1", "LSL timestamp"])
for i in range(time_series.shape[0]):
writer.writerow([time_series[i,0], time_stamps[i]])
def obs(stream, output_path):
time_series = np.array(stream["time_series"])
time_stamps = np.array(stream["time_stamps"])
with open(os.path.join(output_path, 'obs.csv'), 'w', newline='') as file_:
writer = csv.writer(file_, delimiter=";")
writer.writerow(["Frame number", "OBS timestamp", "LSL timestamp"])
for i in range(time_series.shape[0]):
writer.writerow([time_series[i,1], time_series[i,0], time_stamps[i]])
def mouse_position(stream, output_path):
time_series = np.array(stream["time_series"])
time_stamps = np.array(stream["time_stamps"])
with open(os.path.join(output_path, 'mouse_position.csv'), 'w', newline='') as file_:
writer = csv.writer(file_, delimiter=";")
writer.writerow(["X pos", "Y pos", "LSL timestamp"])
for i in range(time_series.shape[0]):
writer.writerow([time_series[i,0], time_series[i,1], time_stamps[i]])
def mouse_buttons(stream, output_path):
time_series = np.array(stream["time_series"])
time_stamps = np.array(stream["time_stamps"])
with open(os.path.join(output_path, 'mouse_buttons.csv'), 'w', newline='') as file_:
writer = csv.writer(file_, delimiter=";")
writer.writerow(["Event", "LSL timestamp"])
for i in range(time_series.shape[0]):
writer.writerow([time_series[i,0], time_stamps[i]])
def keyboard(stream, output_path):
time_series = np.array(stream["time_series"])
time_stamps = np.array(stream["time_stamps"])
with open(os.path.join(output_path, 'keyboard.csv'), 'w', newline='') as file_:
writer = csv.writer(file_, delimiter=";")
writer.writerow(["Event", "LSL timestamp"])
for i in range(time_series.shape[0]):
writer.writerow([time_series[i,0], time_stamps[i]])
def audiomic(stream, output_path):
time_series = np.array(stream["time_series"])
time_stamps = np.array(stream["time_stamps"])
with open(os.path.join(output_path, 'audiomic.csv'), 'w', newline='') as file_:
writer = csv.writer(file_, delimiter=";")
writer.writerow(["Sample", "LSL timestamp"])
for i in range(time_series.shape[0]):
writer.writerow([time_series[i,0], time_stamps[i]])
def events(stream, output_path):
time_series = np.array(stream["time_series"])
time_stamps = np.array(stream["time_stamps"])
with open(os.path.join(output_path, 'events.csv'), 'w', newline='') as file_:
writer = csv.writer(file_, delimiter=";")
writer.writerow(["Event", "LSL timestamp"])
for i in range(time_series.shape[0]):
writer.writerow([time_series[i,0], time_stamps[i]])
def popups(stream, output_path):
time_series = np.array(stream["time_series"])
time_stamps = np.array(stream["time_stamps"])
with open(os.path.join(output_path, 'popups.csv'), 'w', newline='') as file_:
writer = csv.writer(file_, delimiter=";")
writer.writerow(["Time appearance", "Time submit", "Video time", "Answer", "LSL timestamp"])
for i in range(time_series.shape[0]):
writer.writerow([time_series[i,0], time_series[i,1], time_series[i,2], time_series[i,3], time_stamps[i]])
def retrospective(stream, output_path):
time_series = np.array(stream["time_series"])
time_stamps = np.array(stream["time_stamps"])
with open(os.path.join(output_path, 'retrospective.csv'), 'w', newline='') as file_:
writer = csv.writer(file_, delimiter=";")
writer.writerow(["Time appearance", "Time submit", "Video time", "Answer", "LSL timestamp"])
for i in range(time_series.shape[0]):
writer.writerow([time_series[i,0], time_series[i,1], time_series[i,2], time_series[i,3], time_stamps[i]])
def questionnaire(stream, output_path, type_):
if type_ == "mcq":
filename = "questionnaire_mcq.csv"
elif type_ == "post":
filename = "questionnaire_post.csv"
time_series = np.array(stream["time_series"])[0][0]
if time_series[0] == "/":
time_series = time_series[1:]
time_series = time_series.split("/")
time_stamps = np.array(stream["time_stamps"])
with open(os.path.join(output_path, filename), 'w', newline='') as file_:
writer = csv.writer(file_, delimiter=";")
nb_questions = len(time_series)
header = []
for i in range(1, nb_questions+1):
header.append("Question" + str(i))
header.append("LSL timestamp")
writer.writerow(header)
writer.writerow(time_series + [time_stamps[0]])
def biosig(stream, output_path):
time_series = np.array(stream["time_series"]).squeeze()
time_stamps = np.array(stream["time_stamps"]).squeeze()
filename = 'click.csv'
with open(os.path.join(output_path, filename), 'w', newline='') as file_:
writer = csv.writer(file_, delimiter=";")
header = []
header.append("Type of click")
header.append("LSL timestamp")
writer.writerow(header)
for i in range(len(time_series)):
writer.writerow([time_series[i], time_stamps[i]])
def ppgsig(stream, output_path):
time_series = np.array(stream["time_series"]).squeeze()
time_stamps = np.array(stream["time_stamps"]).squeeze()
filename = 'ppg.csv'
with open(os.path.join(output_path, filename), 'w', newline='') as file_:
writer = csv.writer(file_, delimiter=";")
header = []
header.append("PPG Info")
header.append("LSL timestamp")
writer.writerow(header)
for i in range(len(time_series)):
writer.writerow([time_series[i], time_stamps[i]])
def d435sig(stream, output_path):
time_series = np.array(stream["time_series"]).squeeze()
time_stamps = np.array(stream["time_stamps"]).squeeze()
filename = 'counterImage.csv'
with open(os.path.join(output_path, filename), 'w', newline='') as file_:
writer = csv.writer(file_, delimiter=";")
header = []
header.append("Number Image")
header.append("LSL timestamp")
writer.writerow(header)
for i in range(len(time_series)):
writer.writerow([time_series[i], time_stamps[i]])
def extract_and_save(streams, output_path):
eeg_done = False
names = []
print(len(streams))
# Sort inversely the streams by name to make sure that you decode "Headset-EEG" before "EEG-Device"
for stream in streams:
name = stream["info"]["name"][0]
names.append(name)
streams = np.array(streams)
indeces = np.argsort(names)[::-1]
streams = streams[indeces]
for index, stream in enumerate(streams):
stream_name = stream["info"]["name"][0]
if stream_name == "Headset-EEG":
print("Doing Headset-EEG")
sampling_rate = eeg(stream, output_path)
pass
elif stream_name == "EEG-Device":
print("Doing EEG-Device")
eeg_device(stream, output_path, sampling_rate)
pass
elif stream_name == "Polar_ECG":
print("Doing Polar_ECG")
ecg(stream, output_path)
pass
elif stream_name == "Tobii":
print("Doing Tobii")
eyes(stream, output_path)
pass
elif stream_name == "OBS":
print("Doing OBS")
obs(stream, output_path)
pass
elif stream_name == "EduBCI-Admin":
print("Doing EduBCI-Admin")
events(stream, output_path)
pass
elif stream_name == "EduBCI-PopUps":
print("Doing EduBCI-PopUps")
popups(stream, output_path)
pass
elif stream_name == "EduBCI-Retrospective":
print("Doing EduBCI-Retrospective")
retrospective(stream, output_path)
pass
elif stream_name == "EduBCI-QuestionnaireMCQ":
print("Doing EduBCI-QuestionnaireMCQ")
questionnaire(stream, output_path, type_="mcq", )
pass
elif stream_name == "EduBCI-QuestionnairePost":
print("Doing EduBCI-QuestionnairePost")
questionnaire(stream, output_path, type_="post")
pass
elif stream_name == "MousePosition":
print("Doing MousePosition")
mouse_position(stream, output_path)
pass
elif stream_name == "MouseButtons":
print("Doing MouseButtons")
mouse_buttons(stream, output_path)
pass
elif stream_name == "Keyboard":
print("Doing Keyboard")
keyboard(stream, output_path)
pass
elif stream_name == "AudioMic":
print("Doing AudioMic")
#audiomic(stream, output_path)
elif stream_name == "BioSigMeasure-Events":
print("Doing BioSignal Events")
biosig(stream, output_path)
elif stream_name == "PPG-CMS50D+":
print("Doing PPG-CMS50D+")
ppgsig(stream, output_path)
elif stream_name == "D435":
print("Doing D435")
d435sig(stream, output_path)
def make_dir(path, remove=False):
if remove:
if os.path.exists(path):
shutil.rmtree(path)
os.makedirs(path)
else:
if not os.path.exists(path):
os.makedirs(path)
def search_filenames(path):
names = os.listdir(path)
for i in range(0, len(names)):
name = names[i]
if name.find("."):
names[i] = name.split(".")[0]
return names
def search_extracted_filenames(path):
names = []
videos = os.listdir(path)
for video in videos:
path_video = os.path.join(path, video)
for subject in os.listdir(path_video):
path_subject = os.path.join(path_video, subject)
path_event = os.path.join(path_subject, "events.csv")
with open(path_event, "r") as csv_file:
reader = csv.reader(csv_file, delimiter=";")
for row in reader:
if "Date" in row[0]:
date = row[0].split("Date: ")[-1]
break
name_xdf = subject + "_" + date
names.append(name_xdf)
return names
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Extracts data from xdf file and saves it in several csv files. It looks for all the xdf files inside data/raw and extract those that are not already extracted in data/extracted')
parser.add_argument('--xdf_file', type=str, help="Path of the xdf file if you only want to do one.")
args = vars(parser.parse_args())
if args["xdf_file"]:
output_path = "single_data"
make_dir(output_path)
print("Loading XDF")
streams, fileheader = pyxdf.load_xdf(args["xdf_file"])
print("XDF loaded!")
extract_and_save(streams, output_path)
else:
raw_path = os.path.join("data", "raw")
extracted_path = os.path.join("data", "extracted")
make_dir(extracted_path, remove=False)
make_dir(os.path.join(extracted_path, "Paris_A"))
make_dir(os.path.join(extracted_path, "Paris_B"))
filenames_raw = search_filenames(raw_path)
# filenames_extracted = search_extracted_filenames(extracted_path)
# filenames = np.setdiff1d(filenames_raw, filenames_extracted)
filenames = filenames_raw
print("To be extracted the following files: " + str(filenames))
for filename in filenames:
print("Extracting " + str(filename))
subject = filename.split("_")[0]
output_path = os.path.join("data", "extracted", subject)
raw_path = raw_path = os.path.join("data", "raw", filename+".xdf")
make_dir(output_path)
print("Loading XDF")
streams, fileheader = pyxdf.load_xdf(raw_path)
print("XDF loaded!")
extract_and_save(streams, output_path)
# events_df = pd.read_csv(os.path.join(output_path, "events.csv"), delimiter=";")
# video = events_df[events_df["Event"].str.contains("Video: ")]["Event"].to_numpy()[0].split("Video: ")[1]
# shutil.move(output_path, os.path.join(extracted_path, video))
print("All files are extracted")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment