Last active
February 6, 2023 17:37
-
-
Save GEJ1/68609eb251fcf02efec56053f3057848 to your computer and use it in GitHub Desktop.
Reads in .asc data files from EyeLink and produces pandas dataframes for further analysis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def ParseEyeLinkAsc(elFilename): | |
"""Reads in .asc data files from EyeLink and produces pandas dataframes for further analysis | |
Created 7/31/18-8/15/18 by DJ. (https://github.com/djangraw/) | |
Updated 7/4/19 by DJ - detects and handles monocular sample data. | |
dfRec,dfMsg,dfFix,dfSacc,dfBlink,dfSamples = ParseEyeLinkAsc(elFilename) | |
-Reads in data files from EyeLink .asc file and produces readable dataframes for further analysis. | |
INPUTS: | |
-elFilename is a string indicating an EyeLink data file from an AX-CPT task in the current path. | |
OUTPUTS: | |
-dfRec contains information about recording periods (often trials) | |
-dfMsg contains information about messages (usually sent from stimulus software) | |
-dfFix contains information about fixations | |
-dfSacc contains information about saccades | |
-dfBlink contains information about blinks | |
-dfSamples contains information about individual samples | |
Created 7/31/18-8/15/18 by DJ. | |
Updated 11/12/18 by DJ - switched from "trials" to "recording periods" for experiments with continuous recording | |
Modified by Gustavo Juantorena (github.com/gej1) 06/01/22 | |
""" | |
# Import packages | |
import numpy as np | |
import pandas as pd | |
import time | |
# ===== READ IN FILES ===== # | |
# Read in EyeLink file | |
print('Reading in EyeLink file %s...'%elFilename) | |
t = time.time() | |
f = open(elFilename,'r') | |
fileTxt0 = f.read().splitlines(True) # split into lines | |
fileTxt0 = list(filter(None, fileTxt0)) # remove emptys | |
fileTxt0 = np.array(fileTxt0) # concert to np array for simpler indexing | |
f.close() | |
print('Done! Took %f seconds.'%(time.time()-t)) | |
# Separate lines into samples and messages | |
print('Sorting lines...') | |
nLines = len(fileTxt0) | |
lineType = np.array(['OTHER']*nLines,dtype='object') | |
iStartRec = None | |
t = time.time() | |
for iLine in range(nLines): | |
if len(fileTxt0[iLine])<3: | |
lineType[iLine] = 'EMPTY' | |
elif fileTxt0[iLine].startswith('*') or fileTxt0[iLine].startswith('>>>>>'): | |
lineType[iLine] = 'COMMENT' | |
elif fileTxt0[iLine].split()[0][0].isdigit() or fileTxt0[iLine].split()[0].startswith('-'): | |
lineType[iLine] = 'SAMPLE' | |
else: | |
lineType[iLine] = fileTxt0[iLine].split()[0] | |
if '!CAL' in fileTxt0[iLine]: # TODO: Find more general way of determining if recording has started | |
iStartRec = iLine+1 | |
print('Done! Took %f seconds.'%(time.time()-t)) | |
# ===== PARSE EYELINK FILE ===== # | |
t = time.time() | |
# Trials | |
print('Parsing recording markers...') | |
iNotStart = np.nonzero(lineType!='START')[0] | |
dfRecStart = pd.read_csv(elFilename,skiprows=iNotStart,header=None,delim_whitespace=True,usecols=[1], low_memory=False) | |
dfRecStart.columns = ['tStart'] | |
iNotEnd = np.nonzero(lineType!='END')[0] | |
dfRecEnd = pd.read_csv(elFilename,skiprows=iNotEnd,header=None,delim_whitespace=True,usecols=[1,5,6], low_memory=False) | |
dfRecEnd.columns = ['tEnd','xRes','yRes'] | |
# combine trial info | |
dfRec = pd.concat([dfRecStart,dfRecEnd],axis=1) | |
nRec = dfRec.shape[0] | |
print('%d recording periods found.'%nRec) | |
# Import Messages | |
print('Parsing stimulus messages...') | |
t = time.time() | |
iMsg = np.nonzero(lineType=='MSG')[0] | |
# set up | |
tMsg = [] | |
txtMsg = [] | |
t = time.time() | |
for i in range(len(iMsg)): | |
# separate MSG prefix and timestamp from rest of message | |
info = fileTxt0[iMsg[i]].split() | |
# extract info | |
tMsg.append(int(info[1])) | |
txtMsg.append(' '.join(info[2:])) | |
# Convert dict to dataframe | |
dfMsg = pd.DataFrame({'time':tMsg, 'text':txtMsg}) | |
print('Done! Took %f seconds.'%(time.time()-t)) | |
# Import Fixations | |
print('Parsing fixations...') | |
t = time.time() | |
iNotEfix = np.nonzero(lineType!='EFIX')[0] | |
dfFix = pd.read_csv(elFilename,skiprows=iNotEfix,header=None,delim_whitespace=True,usecols=range(1,8), low_memory=False) | |
dfFix.columns = ['eye','tStart','tEnd','duration','xAvg','yAvg','pupilAvg'] | |
nFix = dfFix.shape[0] | |
print('Done! Took %f seconds.'%(time.time()-t)) | |
# Saccades | |
print('Parsing saccades...') | |
t = time.time() | |
iNotEsacc = np.nonzero(lineType!='ESACC')[0] | |
dfSacc = pd.read_csv(elFilename,skiprows=iNotEsacc,header=None,delim_whitespace=True,usecols=range(1,11), low_memory=False) | |
dfSacc.columns = ['eye','tStart','tEnd','duration','xStart','yStart','xEnd','yEnd','ampDeg','vPeak'] | |
print('Done! Took %f seconds.'%(time.time()-t)) | |
# Blinks | |
try: | |
print('Parsing blinks...') | |
iNotEblink = np.nonzero(lineType!='EBLINK')[0] | |
dfBlink = pd.read_csv(elFilename,skiprows=iNotEblink,header=None,delim_whitespace=True,usecols=range(1,5), low_memory=False) | |
dfBlink.columns = ['eye','tStart','tEnd','duration'] | |
print('Done! Took %f seconds.'%(time.time()-t)) | |
except: | |
dfBlink = pd.DataFrame() | |
# determine sample columns based on eyes recorded in file | |
eyesInFile = np.unique(dfFix.eye) | |
if eyesInFile.size==2: | |
print('binocular data detected.') | |
cols = ['tSample', 'LX', 'LY', 'LPupil', 'RX', 'RY', 'RPupil'] | |
else: | |
eye = eyesInFile[0] | |
print('monocular data detected (%c eye).'%eye) | |
cols = ['tSample', '%cX'%eye, '%cY'%eye, '%cPupil'%eye] | |
# Import samples | |
print('Parsing samples...') | |
t = time.time() | |
iNotSample = np.nonzero( np.logical_or(lineType!='SAMPLE', np.arange(nLines)<iStartRec))[0] | |
dfSamples = pd.read_csv(elFilename,skiprows=iNotSample,header=None,delim_whitespace=True, | |
usecols=range(0,len(cols)), low_memory=False) | |
dfSamples.columns = cols | |
# Convert values to numbers | |
for eye in ['L','R']: | |
if eye in eyesInFile: | |
dfSamples['%cX'%eye] = pd.to_numeric(dfSamples['%cX'%eye],errors='coerce') | |
dfSamples['%cY'%eye] = pd.to_numeric(dfSamples['%cY'%eye],errors='coerce') | |
dfSamples['%cPupil'%eye] = pd.to_numeric(dfSamples['%cPupil'%eye],errors='coerce') | |
else: | |
dfSamples['%cX'%eye] = np.nan | |
dfSamples['%cY'%eye] = np.nan | |
dfSamples['%cPupil'%eye] = np.nan | |
print('Done! Took %.1f seconds.'%(time.time()-t)) | |
# Return new compilation dataframe | |
return dfRec,dfMsg,dfFix,dfSacc,dfBlink,dfSamples | |
def separate_in_trials(dfMsg, dfSamples, dfFix, verbose=False): | |
"""Parse trials samples and trial fixations using eye tracking messsages | |
Created by Gustavo Juantorena (github.com/gej1) | |
""" | |
trials = {} | |
fixations = {} | |
i = 0 | |
# Take time in message | |
all_trials_start_time = [int(i) for i in dfMsg[dfMsg['text'] == 'beginning_of_trial'].time] | |
start_time = int(dfMsg[dfMsg['text'] == 'beginning_of_trial'].iloc[0].time) | |
for index, message in dfMsg[dfMsg['text'] == 'end_of_trial'].iterrows(): | |
end_time = int(message.time) | |
# Samples in trials | |
df_trial_n = dfSamples.query(f'tSample > {start_time} and tSample < {end_time}') | |
df_trial_n['tSample'] = df_trial_n['tSample'] - start_time | |
trials[i] = df_trial_n | |
# Fixation in trials | |
df_fix_n = dfFix.query(f'tStart > {start_time} and tStart < {end_time}') | |
df_fix_n['tStart'] = df_fix_n['tStart'] - start_time | |
df_fix_n['tEnd'] = df_fix_n['tEnd'] - start_time | |
fixations[i] = df_fix_n | |
# trial_start in trial | |
i += 1 | |
start_time = end_time | |
if verbose: | |
display(df_trial_n) | |
return trials, fixations, all_trials_start_time |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment