Last active
January 19, 2023 02:44
-
-
Save N-M-T/b7221ace2e7acf0c0c836773a3b4cf7c to your computer and use it in GitHub Desktop.
Extract gaze and fixations on surfaces (if available) for a given set of recordings
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import logging | |
import os | |
import traceback as tb | |
import numpy as np | |
import pandas as pd | |
import msgpack | |
logger = logging.getLogger(__name__) | |
def main(recordings, overwrite=False): | |
"""Process given recordings one by one | |
Iterates over each recording and handles cases where no surfaces.pldata or | |
surfaces_timestamps.npy files could be found | |
recordings: List of recording folders | |
""" | |
for rec in recordings: | |
try: | |
logger.info(f"Extracting {rec}...") | |
process_recording(rec, overwrite=overwrite) | |
except FileNotFoundError: | |
logger.warning(f"The recording {rec} contained no prerecorded surfaces") | |
logger.debug(tb.format_exc()) | |
def process_recording(rec, overwrite=False): | |
"""Process a single recording | |
Handles cases where no fixation or gaze data could be found | |
recording: single recording | |
overwrite: Boolean indicating if an existing csv file should be overwritten | |
""" | |
for surface_data_topic in ("gaze", "fixations"): | |
extracted_rows = load_and_yield_data(rec, surface_data_topic) | |
df = create_dataframe(extracted_rows, surface_data_topic) | |
if df.empty: | |
logger.warning( | |
f"The recording {rec} contained no prerecorded {surface_data_topic}_on_surface." | |
) | |
return | |
df = add_world_index(df, rec) | |
df_grouped = df.groupby(df.topic) | |
for surface_name, df in df_grouped: | |
surface_name = surface_name.split(".")[1] | |
csv_out_path = os.path.join( | |
rec, surface_data_topic + "_on_surface_" + surface_name + ".csv" | |
) | |
if os.path.exists(csv_out_path): | |
if not overwrite: | |
logger.warning(f"{csv_out_path} exists already! Not overwriting.") | |
return | |
else: | |
logger.warning(f"{csv_out_path} exists already! Overwriting.") | |
else: | |
logger.info(f"Writing to csv {csv_out_path}...") | |
df.drop("topic", axis=1, inplace=True) | |
df.to_csv(csv_out_path, index=False) | |
def load_and_yield_data(directory, surface_data_topic): | |
"""Load and extract surface data | |
See the data format documentation [1] for details on the data structure | |
Adapted open-source code from Pupil Player [2] to read pldata files | |
[1] https://docs.pupil-labs.com/developer/core/recording-format/#pldata-files | |
[2] https://github.com/pupil-labs/pupil/blob/master/pupil_src/shared_modules/file_methods.py#L137-L153 | |
""" | |
ts_file = os.path.join(directory, "surfaces" + "_timestamps.npy") | |
data_ts = np.load(ts_file) | |
msgpack_file = os.path.join(directory, "surfaces" + ".pldata") | |
with open(msgpack_file, "rb") as fh: | |
unpacker = msgpack.Unpacker(fh, raw=False, use_list=False) | |
for world_ts, (topic, payload) in zip(data_ts, unpacker): | |
data = deserialize_msgpack(payload) | |
if surface_data_topic == "gaze": | |
for datum in data["gaze_on_surfaces"]: | |
# custom extraction function for surface-mapped gaze | |
gaze_ts, x_norm, y_norm, on_surf, conf = extract_datum( | |
datum, surface_data_topic | |
) | |
yield ((topic, world_ts, gaze_ts, x_norm, y_norm, on_surf, conf)) | |
else: # fixations | |
for datum in data["fixations_on_surfaces"]: | |
( | |
fix_id, | |
start_ts, | |
dur, | |
disp, | |
x_norm, | |
y_norm, | |
on_surf, | |
) = extract_datum(datum, surface_data_topic) | |
yield ( | |
( | |
topic, | |
world_ts, | |
fix_id, | |
start_ts, | |
dur, | |
disp, | |
x_norm, | |
y_norm, | |
on_surf, | |
) | |
) | |
def deserialize_msgpack(msgpack_bytes): | |
"""Deserialize msgpack[1] data | |
[1] https://msgpack.org/index.html | |
""" | |
return msgpack.unpackb(msgpack_bytes, raw=False, use_list=False) | |
def extract_datum(datum, surface_data_topic): | |
""" | |
Extract surface-mapped datum | |
""" | |
if surface_data_topic == "gaze": | |
return ( | |
datum["timestamp"], | |
datum["norm_pos"][0], # x | |
datum["norm_pos"][1], # y | |
datum["on_surf"], | |
datum["confidence"], | |
) | |
else: # fixations | |
return ( | |
datum["id"], | |
datum["timestamp"], | |
datum["duration"], | |
datum["dispersion"], | |
datum["norm_pos"][0], # x | |
datum["norm_pos"][1], # y | |
datum["on_surf"], | |
) | |
def create_dataframe(extracted_rows, surface_data_topic): | |
""" | |
Create a pandas dataframe from extracted rows | |
""" | |
if surface_data_topic == "gaze": | |
column_names = [ | |
"topic", | |
"world_timestamp", | |
"gaze_timestamp", | |
"x_norm", | |
"y_norm", | |
"on_surf", | |
"confidence", | |
] | |
else: # fixations | |
column_names = [ | |
"topic", | |
"world_timestamp", | |
"fixation_id", | |
"start_timestamp", | |
"duration", | |
"dispersion", | |
"norm_pos_x", | |
"norm_pos_y", | |
"on_surf", | |
] | |
df = pd.DataFrame(extracted_rows, columns=column_names) | |
return df | |
def find_closest(target, source): | |
"""Find indices of closest `target` elements for elements in `source`. | |
`target` is assumed to be sorted. Result has same shape as `source`. | |
Implementation taken from: | |
https://stackoverflow.com/questions/8914491/finding-the-nearest-value-and-return-the-index-of-array-in-python/8929827#8929827 | |
helper function to find world indices | |
""" | |
target = np.asarray(target) # fixes https://github.com/pupil-labs/pupil/issues/1439 | |
idx = np.searchsorted(target, source) | |
idx = np.clip(idx, 1, len(target) - 1) | |
left = target[idx - 1] | |
right = target[idx] | |
idx -= source - left < right - source | |
return idx | |
def add_world_index(df, recording): | |
ts_file = os.path.join(recording, "world_timestamps.npy") | |
world_ts = np.load(ts_file) | |
df.insert(2, "world_idx", find_closest(world_ts, df.world_timestamp)) | |
return df | |
if __name__ == "__main__": | |
# setup logging | |
logging.basicConfig(level=logging.DEBUG) | |
# setup command line interface | |
parser = argparse.ArgumentParser( | |
description=( | |
"Extract surface-mapped gaze (if available) " | |
"for a set of given recordings. " | |
"The resulting csv file will be saved within its " | |
"according recording." | |
) | |
) | |
parser.add_argument( | |
"-f", | |
"--overwrite", | |
action="store_true", | |
help=( | |
"Usually, the command refuses to overwrite existing csv files. " | |
"This flag disables these checks." | |
), | |
) | |
parser.add_argument("recordings", nargs="+", help="One or more recordings") | |
# parse command line arguments and start the main procedure | |
args = parser.parse_args() | |
main(recordings=args.recordings, overwrite=args.overwrite) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment