|
import json |
|
import logging |
|
import pathlib |
|
import re |
|
from collections import defaultdict |
|
|
|
import click |
|
import numpy as np |
|
import pandas as pd |
|
from rich.logging import RichHandler |
|
from rich.progress import track |
|
from rich.traceback import install |
|
|
|
install(show_locals=False, suppress=[pd, click]) |
|
|
|
FILE_PATTERN = re.compile(r"(gaze|worn|extimu) ps(?P<part>\d*)\.(raw|time)") |
|
|
|
|
|
@click.command() |
|
@click.argument( |
|
"recordings", |
|
nargs=-1, |
|
type=click.Path( |
|
exists=True, |
|
writable=True, |
|
file_okay=False, |
|
dir_okay=True, |
|
path_type=pathlib.Path, |
|
), |
|
) |
|
@click.option( |
|
"-e", |
|
"--export-folder", |
|
default="export", |
|
help="Relative export path", |
|
show_default="<recording>/export", |
|
) |
|
@click.option("-f", "--force", is_flag=True, help="Overwrite an existing export") |
|
@click.option( |
|
"-v", "--verbose", count=True, help="Show more log messages (repeat for even more)" |
|
) |
|
def main(recordings, export_folder, force, verbose): |
|
_setup_logging(verbose_option_count=verbose) |
|
if not recordings: |
|
logging.error("No recordings provided") |
|
raise SystemExit(-1) |
|
logging.info(f"Processing {len(recordings)} recordings") |
|
for rec in track(recordings): |
|
process_recording(rec, export_folder, force) |
|
|
|
|
|
def process_recording(recording: pathlib.Path, export_folder: str, force: bool) -> None: |
|
if isinstance(recording, bytes): |
|
# Fix reported case of Click passing bytes instead of pathlib.Path |
|
recording = pathlib.Path(recording.decode("utf-8")) |
|
logging.info(f"Processing {recording.resolve()}") |
|
export_path = recording / export_folder |
|
if export_path.exists(): |
|
if force: |
|
logging.warning(f"'{export_path}' exists. Overwriting.") |
|
else: |
|
logging.warning( |
|
f"'{export_path}' exists. No -f/--force provided. Skipping." |
|
) |
|
return |
|
else: |
|
export_path.mkdir() |
|
|
|
try: |
|
_process_events(recording, export_path) |
|
_process_template(recording, export_path) |
|
_process_gaze(recording, export_path) |
|
_process_imu(recording, export_path) |
|
except FileNotFoundError: |
|
logging.exception( |
|
"Did not encounter expected files. " |
|
"An unmodified Invisible recording is required." |
|
) |
|
|
|
|
|
def _process_events(recording: pathlib.Path, export_path: pathlib.Path) -> None: |
|
event_names = (recording / "event.txt").read_text().splitlines() |
|
event_timestamps = np.fromfile(recording / "event.time", dtype="<u8") |
|
|
|
export_path /= "events.csv" |
|
events = pd.DataFrame({"timestamp [ns]": event_timestamps, "name": event_names}) |
|
events["type"] = "recording" |
|
logging.info(f"Exporting event data to '{export_path}'") |
|
events.to_csv(export_path, index=False) |
|
|
|
|
|
def _process_template(recording: pathlib.Path, export_path: pathlib.Path) -> None: |
|
template_questions = json.loads((recording / "template.json").read_text()) |
|
info = json.loads((recording / "info.json").read_text()) |
|
template_responses = info["template_data"] |
|
assert ( |
|
template_questions["id"] == template_responses["id"] |
|
), "Template data is inconsistent" |
|
|
|
merged = { |
|
"name": template_questions["name"], |
|
"description": template_questions["description"], |
|
"template_id": template_questions["id"], |
|
"Q&A": [ |
|
{ |
|
"question": next( |
|
item |
|
for item in template_questions["items"] |
|
if item["id"] == question_id |
|
), |
|
"responses": responses, |
|
} |
|
for question_id, responses in template_responses["data"].items() |
|
], |
|
} |
|
export_path = export_path / "template.json" |
|
logging.info(f"Exporting template data to '{export_path}'") |
|
export_path.write_text(json.dumps(merged, indent=4)) |
|
|
|
|
|
def _process_gaze(recording: pathlib.Path, export_path: pathlib.Path) -> None: |
|
logging.debug(f"Processing gaze") |
|
files_raw = sorted(recording.glob("gaze ps*.raw"), key=_file_sorter_by_part) |
|
files_ts = sorted(recording.glob("gaze ps*.time"), key=_file_sorter_by_part) |
|
files_worn = sorted(recording.glob("worn ps*.raw"), key=_file_sorter_by_part) |
|
assert ( |
|
len(files_raw) == len(files_ts) == len(files_worn) > 0 |
|
), f"Inconsistent number of files: {files_raw}, {files_ts}, {files_worn}" |
|
dfs = { |
|
raw.stem: _process_gaze_file(raw, ts, worn) |
|
for raw, ts, worn in zip(files_raw, files_ts, files_worn) |
|
} |
|
|
|
export_path = export_path / "gaze.csv" |
|
logging.info(f"Exporting gaze to '{export_path}'") |
|
data = pd.concat(dfs.values(), keys=dfs.keys(), names=["file"]).reset_index("file") |
|
data.to_csv(export_path, index=False) |
|
|
|
|
|
def _process_gaze_file( |
|
raw: pathlib.Path, time: pathlib.Path, worn: pathlib.Path |
|
) -> pd.DataFrame: |
|
coords = np.fromfile(raw, dtype="<f4").reshape((-1, 2)) |
|
ts = np.fromfile(time, dtype="<u8") |
|
onoff = (np.fromfile(worn, dtype="<u1") / 255).astype(bool) |
|
is_consistent = coords.shape[0] == ts.shape[0] == onoff.shape[0] |
|
if not is_consistent: |
|
sample_numbers = { |
|
"gaze": coords.shape[0], |
|
"time": ts.shape[0], |
|
"worn": onoff.shape[0], |
|
} |
|
num_min_samples = min(sample_numbers.values()) |
|
coords = coords[:num_min_samples, :] |
|
ts = ts[:num_min_samples] |
|
onoff = onoff[:num_min_samples] |
|
logging.warning( |
|
"Inconsistent sample numbers detected. Reducing to largest consistent " |
|
f"sample number: {num_min_samples}" |
|
) |
|
logging.debug(f"Inconsistent {sample_numbers=}") |
|
|
|
return pd.DataFrame( |
|
{ |
|
"timestamp [ns]": ts, |
|
"gaze x [px]": coords[:, 0], |
|
"gaze y [px]": coords[:, 1], |
|
"worn": onoff, |
|
} |
|
) |
|
|
|
|
|
def _process_imu(recording: pathlib.Path, export_path: pathlib.Path) -> None: |
|
logging.debug(f"Processing IMU") |
|
files_raw = sorted(recording.glob("extimu ps*.raw"), key=_file_sorter_by_part) |
|
files_ts = sorted(recording.glob("extimu ps*.time"), key=_file_sorter_by_part) |
|
assert ( |
|
len(files_raw) == len(files_ts) > 0 |
|
), f"Inconsistent number of files: {files_raw=}, {files_ts=}" |
|
dfs = {raw.stem: _process_imu_file(raw, ts) for raw, ts in zip(files_raw, files_ts)} |
|
|
|
export_path = export_path / "imu.csv" |
|
logging.info(f"Exporting IMU to '{export_path}'") |
|
data = pd.concat(dfs.values(), keys=dfs.keys(), names=["file"]).reset_index("file") |
|
data.to_csv(export_path, index=False) |
|
|
|
|
|
def _process_imu_file(raw: pathlib.Path, time: pathlib.Path) -> pd.DataFrame: |
|
coords = np.fromfile(raw, dtype="<f4").reshape((-1, 6)) |
|
ts = np.fromfile(time, dtype="<u8") |
|
assert coords.shape[0] == ts.shape[0], "Inconsistent IMU and time data" |
|
logging.debug(f"'{raw.stem}': {ts.shape[0]} data points") |
|
return pd.DataFrame( |
|
{ |
|
"timestamp [ns]": ts, |
|
"gyro x [deg/s]": coords[:, 0], |
|
"gyro y [deg/s]": coords[:, 1], |
|
"gyro z [deg/s]": coords[:, 2], |
|
"acceleration x [G]": coords[:, 3], |
|
"acceleration y [G]": coords[:, 4], |
|
"acceleration z [G]": coords[:, 5], |
|
} |
|
) |
|
|
|
|
|
def _setup_logging(verbose_option_count): |
|
levels = defaultdict(lambda: "WARNING") |
|
levels[1] = "INFO" |
|
levels[2] = "DEBUG" |
|
logging.basicConfig( |
|
level=levels[verbose_option_count], |
|
format="%(message)s", |
|
handlers=[RichHandler()], |
|
) |
|
|
|
|
|
def _file_sorter_by_part(file_path: pathlib.Path): |
|
match = FILE_PATTERN.fullmatch(file_path.name) |
|
assert match, f"Unexpected file name: '{file_path}'" |
|
return int(match.group("part")) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |