Skip to content

Instantly share code, notes, and snippets.

@marc-tonsen
Last active April 11, 2022 07:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marc-tonsen/d8e4cd7d1a322a8edcc7f1666961b2b9 to your computer and use it in GitHub Desktop.
Save marc-tonsen/d8e4cd7d1a322a8edcc7f1666961b2b9 to your computer and use it in GitHub Desktop.
Written by @dourvaris

Scanpath Visualization

This Python script reads the Pupil Cloud raw data export and renders the subject's scanpath into the scene video.

Installation

  1. To install the requirements run pip install more_itertools pandas numpy tqdm av opencv-python
  2. Download the scanpathvis.py file to your computer

Usage

  1. Right click the corresponding recording in Pupil Cloud
  2. Select Downloads -> Download Recording
  3. Unzip the downloaded raw data archive
  4. Run the following command in the terminal (assumes the dependencies to be installed already)
python <path to scanpathvis.py file> <path to folder containing scene video>

Code written by @dourvaris

more_itertools
pandas
numpy
tqdm
av
opencv-python
import cv2
import json
import pathlib
import tqdm
import numpy as np
import pandas as pd
import sys
import csv
import av
import more_itertools
recpath = pathlib.Path(sys.argv[1])
try:
scene_vid = list(recpath.glob("*.mp4"))[0]
except Exception:
raise Exception("could not find scene video file")
print("Reading gaze data...")
gaze_stream = more_itertools.peekable(pd.read_csv(recpath / "gaze.csv").to_records())
print("Reading world timestamps...")
world_ts = [
row["timestamp [ns]"]
for row in pd.read_csv(recpath / "world_timestamps.csv").to_records()
]
_lk_params = dict(
winSize=(90, 90),
maxLevel=3,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03),
minEigThreshold=0.005,
)
input_container = av.open(str(scene_vid))
output_container = av.open(str(recpath / "scan_path_visualization.mp4"), "w")
try:
output_video = output_container.add_stream("h264_nvenc")
print("using nvenc encoding")
except Exception as e:
print("nvenc not available", e)
output_video = output_container.add_stream("h264")
output_video.options["bf"] = "0"
output_video.options["movflags"] = "faststart"
output_video.codec_context.height = 1080
output_video.codec_context.width = 1088
output_video.codec_context.time_base = input_container.streams.video[0].time_base
def stream_fixations(recpath):
fixations_csv = recpath / "fixations.csv"
if fixations_csv.exists():
yield from csv.DictReader((recpath / "fixations.csv").open())
else:
fixations_json = recpath / "fixations.json"
if fixations_json.exists():
for fixation in json.load(fixations_json.open()):
if fixation["label"] == "fixation":
yield {
"fixation id": fixation["instance"],
"fixation x [px]": fixation["start_x"],
"fixation y [px]": fixation["start_y"],
"start timestamp [ns]": fixation["start_t"],
"end timestamp [ns]": fixation["end_t"],
"duration [ms]": (fixation["end_t"] - fixation["start_t"])
/ 1e6,
}
fixations_stream = more_itertools.peekable(stream_fixations(recpath))
yellow = (0, 255, 255)
red = (0, 0, 255)
green = (0, 255, 0)
white = (255, 255, 255)
blue = (255, 0, 0)
black = (0, 0, 0)
orange = (0, 122, 255)
gaze_color = red
text_color = yellow
fixation_colors = [orange]
scan_color = yellow
prev_img = None
scanpath = (recpath / "scanpath.json").open("w")
trail = []
world_timestamps = zip(world_ts[:-1], world_ts[1:])
world_frames = zip(world_timestamps, input_container.decode(video=0))
with output_container:
for (world_start_ts, world_stop_ts), world_frame in tqdm.tqdm(
world_frames, unit=" frames", total=len(world_ts) - 1
):
if world_frame.pts is None:
continue
world_img = world_frame.to_ndarray(format="bgr24")
draw_img = world_img.copy()
while 1:
try:
next_gaze = gaze_stream.peek()
except StopIteration:
break
if next_gaze is None or next_gaze["timestamp [ns]"] > world_stop_ts:
break
gaze = next(gaze_stream)
px = gaze["gaze x [px]"]
py = gaze["gaze y [px]"]
cv2.circle(draw_img, (int(px), int(py)), 20, gaze_color, 3)
while 1:
try:
next_fixation = fixations_stream.peek()
except StopIteration:
break
if (
next_fixation is None
or int(next_fixation["start timestamp [ns]"]) > world_stop_ts
):
break
fixation = next(fixations_stream)
trail.append(fixation)
history_size = int(1e9)
if prev_img is not None and trail:
trail = [
fixation
for fixation in trail
if int(fixation["end timestamp [ns]"]) > world_stop_ts - history_size
]
new_points, status, err = cv2.calcOpticalFlowPyrLK(
prev_img,
world_img,
np.array(
[
(
float(fixation["fixation x [px]"]),
float(fixation["fixation y [px]"]),
)
for fixation in trail
]
)
.astype(np.float32)
.reshape(-1, 2),
None,
**_lk_params,
)
if new_points is not None:
newtrail = []
for fixation, ok, point in zip(trail, status, new_points):
fixation["fixation x [px]"] = float(point[0])
fixation["fixation y [px]"] = float(point[1])
if ok:
newtrail.append(fixation)
trail = newtrail
scanpath.write(
json.dumps(
{
"ts": int(world_start_ts),
"f": [
{
"x": int(t["fixation x [px]"]),
"y": int(t["fixation y [px]"]),
"i": t["fixation id"],
}
for t in trail
],
}
)
+ "\n"
)
prev_p = None
for fixation in trail:
p = int(fixation["fixation x [px]"]), int(fixation["fixation y [px]"])
if prev_p:
cv2.line(draw_img, prev_p, p, scan_color, 2)
size = int(int(fixation["duration [ms]"]) / 250.0 * 20)
fixation_color = fixation_colors[
int(fixation["fixation id"]) % len(fixation_colors)
]
cv2.circle(draw_img, p, size, fixation_color, 3)
plot_point = (p[0] + 20, p[1] - size)
for color, thickness in [(black, 4), (fixation_color, 2)]:
cv2.putText(
draw_img,
str(fixation["fixation id"]),
plot_point,
cv2.FONT_HERSHEY_SIMPLEX,
fontScale=1,
thickness=thickness,
color=color,
)
prev_p = p
# cv2.imshow("world", draw_img)
# if cv2.waitKey(1) & 0xFF == ord("q"):
# break
new_frame = world_frame.from_ndarray(draw_img, format="bgr24")
new_frame.pts = world_frame.pts
packets = output_video.encode(new_frame)
output_container.mux(packets)
prev_img = world_img
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment