Skip to content

Instantly share code, notes, and snippets.

@papr
Last active November 22, 2022 08:18
Show Gist options
  • Save papr/91def6b718e5a046ef7bb79c85466ee9 to your computer and use it in GitHub Desktop.
Save papr/91def6b718e5a046ef7bb79c85466ee9 to your computer and use it in GitHub Desktop.
This Python script extracts gaze, IMU, and template data from raw Invisible recordings.

PI Export Utility

This Python script extracts gaze, IMU, and template data from raw Invisible recordings. Gaze and IMU data are exported as CSV files. Template data as json.

Usage: pi-export.py [OPTIONS] [RECORDINGS]...

Options:
  -e, --export-folder TEXT  Relative export path  [default: `<recording>/export`]
  -f, --force               Overwrite an existing export
  -v, --verbose             Show more log messages (repeat for even more)
  --help                    Show this message and exit.

Caveats

  1. Requires untouched Pupil Invisible recordings. Does not work if they were opened in Pupil Player
  2. Does not calculate gaze azimuth [deg]/evelation [deg] (yet)
  3. Does not calculate IMU roll [deg]/pitch [deg] (yet)
  4. Does not handle incomplete recordings (missing/corrupted files)

Installation

Requires Python 3.8 or higher

pip install -r requirements.txt
import json
import logging
import pathlib
import re
from collections import defaultdict
import click
import numpy as np
import pandas as pd
from rich.logging import RichHandler
from rich.progress import track
from rich.traceback import install
install(show_locals=False, suppress=[pd, click])
FILE_PATTERN = re.compile(r"(gaze|worn|extimu) ps(?P<part>\d*)\.(raw|time)")
@click.command()
@click.argument(
"recordings",
nargs=-1,
type=click.Path(
exists=True,
writable=True,
file_okay=False,
dir_okay=True,
path_type=pathlib.Path,
),
)
@click.option(
"-e",
"--export-folder",
default="export",
help="Relative export path",
show_default="<recording>/export",
)
@click.option("-f", "--force", is_flag=True, help="Overwrite an existing export")
@click.option(
"-v", "--verbose", count=True, help="Show more log messages (repeat for even more)"
)
def main(recordings, export_folder, force, verbose):
_setup_logging(verbose_option_count=verbose)
if not recordings:
logging.error("No recordings provided")
raise SystemExit(-1)
logging.info(f"Processing {len(recordings)} recordings")
for rec in track(recordings):
process_recording(rec, export_folder, force)
def process_recording(recording: pathlib.Path, export_folder: str, force: bool) -> None:
if isinstance(recording, bytes):
# Fix reported case of Click passing bytes instead of pathlib.Path
recording = pathlib.Path(recording.decode("utf-8"))
logging.info(f"Processing {recording.resolve()}")
export_path = recording / export_folder
if export_path.exists():
if force:
logging.warning(f"'{export_path}' exists. Overwriting.")
else:
logging.warning(
f"'{export_path}' exists. No -f/--force provided. Skipping."
)
return
else:
export_path.mkdir()
try:
_process_events(recording, export_path)
_process_template(recording, export_path)
_process_gaze(recording, export_path)
_process_imu(recording, export_path)
except FileNotFoundError:
logging.exception(
"Did not encounter expected files. "
"An unmodified Invisible recording is required."
)
def _process_events(recording: pathlib.Path, export_path: pathlib.Path) -> None:
event_names = (recording / "event.txt").read_text().splitlines()
event_timestamps = np.fromfile(recording / "event.time", dtype="<u8")
export_path /= "events.csv"
events = pd.DataFrame({"timestamp [ns]": event_timestamps, "name": event_names})
events["type"] = "recording"
logging.info(f"Exporting event data to '{export_path}'")
events.to_csv(export_path, index=False)
def _process_template(recording: pathlib.Path, export_path: pathlib.Path) -> None:
template_questions = json.loads((recording / "template.json").read_text())
info = json.loads((recording / "info.json").read_text())
template_responses = info["template_data"]
assert (
template_questions["id"] == template_responses["id"]
), "Template data is inconsistent"
merged = {
"name": template_questions["name"],
"description": template_questions["description"],
"template_id": template_questions["id"],
"Q&A": [
{
"question": next(
item
for item in template_questions["items"]
if item["id"] == question_id
),
"responses": responses,
}
for question_id, responses in template_responses["data"].items()
],
}
export_path = export_path / "template.json"
logging.info(f"Exporting template data to '{export_path}'")
export_path.write_text(json.dumps(merged, indent=4))
def _process_gaze(recording: pathlib.Path, export_path: pathlib.Path) -> None:
logging.debug(f"Processing gaze")
files_raw = sorted(recording.glob("gaze ps*.raw"), key=_file_sorter_by_part)
files_ts = sorted(recording.glob("gaze ps*.time"), key=_file_sorter_by_part)
files_worn = sorted(recording.glob("worn ps*.raw"), key=_file_sorter_by_part)
assert (
len(files_raw) == len(files_ts) == len(files_worn) > 0
), f"Inconsistent number of files: {files_raw}, {files_ts}, {files_worn}"
dfs = {
raw.stem: _process_gaze_file(raw, ts, worn)
for raw, ts, worn in zip(files_raw, files_ts, files_worn)
}
export_path = export_path / "gaze.csv"
logging.info(f"Exporting gaze to '{export_path}'")
data = pd.concat(dfs.values(), keys=dfs.keys(), names=["file"]).reset_index("file")
data.to_csv(export_path, index=False)
def _process_gaze_file(
raw: pathlib.Path, time: pathlib.Path, worn: pathlib.Path
) -> pd.DataFrame:
coords = np.fromfile(raw, dtype="<f4").reshape((-1, 2))
ts = np.fromfile(time, dtype="<u8")
onoff = (np.fromfile(worn, dtype="<u1") / 255).astype(bool)
is_consistent = coords.shape[0] == ts.shape[0] == onoff.shape[0]
if not is_consistent:
sample_numbers = {
"gaze": coords.shape[0],
"time": ts.shape[0],
"worn": onoff.shape[0],
}
num_min_samples = min(sample_numbers.values())
coords = coords[:num_min_samples, :]
ts = ts[:num_min_samples]
onoff = onoff[:num_min_samples]
logging.warning(
"Inconsistent sample numbers detected. Reducing to largest consistent "
f"sample number: {num_min_samples}"
)
logging.debug(f"Inconsistent {sample_numbers=}")
return pd.DataFrame(
{
"timestamp [ns]": ts,
"gaze x [px]": coords[:, 0],
"gaze y [px]": coords[:, 1],
"worn": onoff,
}
)
def _process_imu(recording: pathlib.Path, export_path: pathlib.Path) -> None:
logging.debug(f"Processing IMU")
files_raw = sorted(recording.glob("extimu ps*.raw"), key=_file_sorter_by_part)
files_ts = sorted(recording.glob("extimu ps*.time"), key=_file_sorter_by_part)
assert (
len(files_raw) == len(files_ts) > 0
), f"Inconsistent number of files: {files_raw=}, {files_ts=}"
dfs = {raw.stem: _process_imu_file(raw, ts) for raw, ts in zip(files_raw, files_ts)}
export_path = export_path / "imu.csv"
logging.info(f"Exporting IMU to '{export_path}'")
data = pd.concat(dfs.values(), keys=dfs.keys(), names=["file"]).reset_index("file")
data.to_csv(export_path, index=False)
def _process_imu_file(raw: pathlib.Path, time: pathlib.Path) -> pd.DataFrame:
coords = np.fromfile(raw, dtype="<f4").reshape((-1, 6))
ts = np.fromfile(time, dtype="<u8")
assert coords.shape[0] == ts.shape[0], "Inconsistent IMU and time data"
logging.debug(f"'{raw.stem}': {ts.shape[0]} data points")
return pd.DataFrame(
{
"timestamp [ns]": ts,
"gyro x [deg/s]": coords[:, 0],
"gyro y [deg/s]": coords[:, 1],
"gyro z [deg/s]": coords[:, 2],
"acceleration x [G]": coords[:, 3],
"acceleration y [G]": coords[:, 4],
"acceleration z [G]": coords[:, 5],
}
)
def _setup_logging(verbose_option_count):
levels = defaultdict(lambda: "WARNING")
levels[1] = "INFO"
levels[2] = "DEBUG"
logging.basicConfig(
level=levels[verbose_option_count],
format="%(message)s",
handlers=[RichHandler()],
)
def _file_sorter_by_part(file_path: pathlib.Path):
match = FILE_PATTERN.fullmatch(file_path.name)
assert match, f"Unexpected file name: '{file_path}'"
return int(match.group("part"))
if __name__ == "__main__":
main()
click
numpy
pandas
rich
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment