Skip to content

Instantly share code, notes, and snippets.

@elepl94
Last active October 19, 2023 16:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save elepl94/9f669c4d81e455cf2095957831219664 to your computer and use it in GitHub Desktop.
Save elepl94/9f669c4d81e455cf2095957831219664 to your computer and use it in GitHub Desktop.
Script to create images and videos of fixation scanpath over the reference image

Fixation Scanpath Image and Video Generator

Prerequisites

Before you get started, make sure you have the following:

  • A Reference Image Mapper export download.
  • Python 3.7 or higher installed on your system.

Installation

  1. Navigate to the downloaded gist folder
  2. Run the command pip install -r requirements.txt
  3. Run the command python3 rim_scanpath.py in your terminal and follow the prompted instructions

Personalization

Individual scanpath color

This script comes with a function that generates random colors for each participant based on their names. However, if you prefer to assign specific colors to each participant, you can easily modify the function. For example:

def color_generator():
    colors = {}
    colors['Subject1'] = (0, 0, 255)
    colors['Subject2'] = (255, 0, 0)
    colors['Subject3'] = (0, 255, 0)
    
    return colors

Font customization

You can also personalize the text displayed on the fixation scanpaths. The script uses OpenCV for text rendering. To change the font, size, or adjust other text aesthetics, you can edit the draw_on_frame() function. Fixation IDs are displayed in black text with a white border to ensure visibility against the background. If you modify the font size, it's recommended to adjust font_thick_w and font_thick_b values to maintain visual contrast.

def draw_on_frame(...):
    # text aesthetics
    font = cv2.FONT_HERSHEY_DUPLEX
    font_size = 1
    font_thick_w = 3 
    font_thick_b = 1
    ...

Legends

The script includes two functions for creating legends:

  1. draw_name_legend()
  2. draw_all_names_legend()

To customize the appearance of the legend, such as its position, dimensions, or colors of the rectangular white box or the colored line, you can modify the following parameters in both functions:

  • r_end_point: x and y values of the ending coordinates of the rectangular legend box.
  • r_start_point: x and y values of the starting coordinates of the rectangular legend box.
  • l_end_point: x and y values of the ending coordinates of the colored line.
  • l_start_point: x and y values of the starting coordinates of the colored line.
opencv-python
numpy
pandas
tqdm
pathlib
rich
moviepy
import os
import cv2
import math
import random
import shutil
import logging
import numpy as np
import pandas as pd
import tkinter as tk
from tqdm import tqdm
from tkinter import ttk
from pathlib import Path
from tkinter import messagebox
from tkinter import filedialog
from rich.logging import RichHandler
from moviepy.editor import ImageClip
from contextlib import contextmanager
logger = logging.getLogger()
logging.basicConfig(
format="%(message)s", datefmt="[%X]", level=logging.INFO, handlers=[RichHandler()]
)
# select the RIM export folder
def select_folder():
root = tk.Tk()
root.withdraw()
rim_folder = filedialog.askdirectory()
required_files = [
"fixations.csv",
"gaze.csv",
"reference_image.jpeg",
"sections.csv",
"enrichment_info.txt",
]
if not all(
os.path.isfile(os.path.join(rim_folder, file)) for file in required_files
):
raise Exception(
"Wrong folder! Please select a Reference Image Mapper export folder"
)
logging.info(f"RIM export folder: {rim_folder}")
return rim_folder
# set paths to files from RIM export folder
def paths_to_rim_files(rim_folder):
files_to_include = [
"fixations.csv",
"sections.csv",
"gaze.csv",
"reference_image.jpeg",
]
filepaths = {
file_name: Path(rim_folder) / file_name for file_name in files_to_include
}
return filepaths
import tkinter as tk
# Initialize the selected_id as None by default
selected_id = None
# check if the scanning recording id is included among the wearers to exclude it from the scanpath generation
def check_sections_csv(filepaths):
# Load sections.csv as a pandas DataFrame
df = pd.read_csv(filepaths["sections.csv"])
# Check if there is more than one row for the "recording id" column
if df["recording id"].nunique() > 1:
# Check if the "start recording event" column has "recording.begin"
if "recording.begin" in df["start event name"].unique():
# Check if the "end recording event" column has "recording.end"
if "recording.end" in df["end event name"].unique():
# Create a GUI to ask if the scanning recording should be included
root = tk.Tk()
root.title("Include Scanning Recording?")
def include_scanning():
root.quit()
global selected_id
selected_id = True # Set selected_id to True if 'Yes' is pressed
def exclude_scanning():
root.quit()
global selected_id
selected_id = False # Set selected_id to False if 'No' is pressed
# Create a frame for the message box
frame = tk.Frame(root)
frame.pack(padx=20, pady=10)
# Display a message label
message_label = tk.Label(
frame,
text="Is the scanning recording included in the following list?"
)
message_label.pack()
# Add a listbox to display available recording names
row_listbox = tk.Listbox(frame, width=100)
# Insert the names into the listbox
for row in df.itertuples():
row_listbox.insert(
tk.END,
f"Id: {row[2]} | Recording name: {row[3]} | Wearer: {row[5]}",
)
row_listbox.pack()
# Add 'Yes' and 'No' buttons
yes_button = tk.Button(frame, text="Yes", command=include_scanning)
no_button = tk.Button(frame, text="No", command=exclude_scanning)
yes_button.pack(side="left", padx=10)
no_button.pack(side="left", padx=10)
root.mainloop()
root.destroy()
# Return the selected_id value or None if no conditions are met
return selected_id
# change fixation nr to start from 1 for each subject
def reset_fixation_id(filepaths):
rim_data = {}
files_to_process = {
file_name: pd.read_csv(filepaths[file_name])
for file_name in filepaths
if file_name.endswith(".csv")
if file_name != "sections.csv"
}
for file_name in files_to_process:
data = files_to_process[file_name]
# Create a new column 'fixation id new'
data["fixation id new"] = data["fixation id"]
# Get unique section ids
unique_section_ids = data["section id"].unique()
for section_id in unique_section_ids:
# Get the subset of rows for the current section_id
section_rows = data[data["section id"] == section_id]
# Get the minimum value of 'fixation id' for the current section
min_fixation_id = section_rows["fixation id"].min()
# Subtract min 'fixation id' from all in 'fixation id new' column
data.loc[data["section id"] == section_id, "fixation id new"] = (
data.loc[data["section id"] == section_id, "fixation id new"]
- min_fixation_id
+ 1
)
# drop the old fixation id column
data.drop(columns=["fixation id"], inplace=True)
# rename the new fixation id column
data.rename(columns={"fixation id new": "fixation id"}, inplace=True)
rim_data[file_name] = []
rim_data[file_name].append(data)
logging.info("Set fixation nr to start from 1")
return rim_data
# add subject name to data
def set_name(rim_data, filepaths):
sections = pd.read_csv(filepaths["sections.csv"])
#names = list(sections["wearer name"].unique())
items_to_include = ["fixations.csv", "gaze.csv"]
# check for any duplicates
duplicates = sections.duplicated(subset=["wearer name"], keep=False)
if any(duplicates):
counts = {}
for i, row in sections.iterrows():
name = row["wearer name"]
# If the name is a duplicate, add an index to it
if duplicates[i]:
if name not in counts:
counts[name] = 1
else:
counts[name] += 1
sections.at[i, "wearer name"] = f"{name}_{counts[name]}"
# Save the updated DataFrame to the CSV file
sections.to_csv(filepaths["sections.csv"], index=False)
names = list(sections["wearer name"].unique())
for file in rim_data:
if file in items_to_include:
data = rim_data[file][0]
# Get unique section ids
unique_section_ids = data["section id"].unique()
for idx, name in zip(unique_section_ids, names):
# set fix id to start from 1 in gaze.csv and fixation.csv
data.loc[data["section id"] == idx, ["names"]] = name
rim_data[file] = []
rim_data[file].append(data)
logging.info("Wearer names added to fixation and gaze files")
# add fixation x, y, duration and detected on ref img to gaze.csv
def add_fix_xyd(rim_data):
gaze = rim_data["gaze.csv"][0]
fixations = rim_data["fixations.csv"][0]
# Add the 'fix x' column to the gaze DataFrame with null values
gaze["fix x"] = None
gaze["fix y"] = None
gaze["fix d"] = None
gaze["fix in refimg"] = None
# Iterate over the rows of gaze
for i, gaze_row in gaze.iterrows():
# Get value of 'section id' and 'fixation id' for current row
section_id, fixation_id = gaze_row[["section id", "fixation id"]]
# Filter fixations
filtered_fixations = fixations[
(fixations["section id"] == section_id)
& (fixations["fixation id"] == fixation_id)
]
# If any matching rows, get the value of the 'fixation x [px]' column
if filtered_fixations.shape[0] > 0:
fix_x = filtered_fixations.iloc[0]["fixation x [px]"]
fix_y = filtered_fixations.iloc[0]["fixation y [px]"]
fix_d = filtered_fixations.iloc[0]["duration [ms]"]
fix_inrefimg = filtered_fixations.iloc[0]["fixation detected in reference image"]
# Fill 'fix x' column of gaze row with 'fixation x [px]' value
gaze.at[i, "fix x"] = fix_x
gaze.at[i, "fix y"] = fix_y
gaze.at[i, "fix d"] = fix_d
gaze.at[i, "fix in refimg"] = fix_inrefimg
rim_data["gaze.csv"] = []
rim_data["fixations.csv"] = []
rim_data["gaze.csv"].append(gaze)
rim_data["fixations.csv"].append(fixations)
logging.info("Fixations info merged to gaze file")
"""
CircleScaler is a class for interactively visualizing fixation scanpaths over a reference image.
This class allows users to scale and display fixation circles representing fixation durations on the
reference image. It provides options to increase or decrease the scale factor, and it displays two fixation circles:
one in red for longer fixations and another in green for shorter fixations. Users can interact with the
preview window to adjust the scale and visualize the impact on the fixation circles.
"""
class CircleScaler:
def __init__(self, img_path, rim_data):
self.img = cv2.imread(img_path)
self.img_height = self.img.shape[0]
self.img_width = self.img.shape[1]
self.max_xc = int((self.img_width / 4) * 3)
self.min_xc = int(self.img_width / 4)
self.yc = int(self.img_height / 2)
self.gaze = rim_data["gaze.csv"][0]
self.max_d = self.gaze["fix d"].max()
self.min_d = self.gaze["fix d"].min()
self.scale_factor = 1.0
self.update_circles()
def __exit__(self, exc_type, exc_value, exc_traceback):
cv2.destroyAllWindows()
def get_display_dimensions(self):
root = tk.Tk()
root.withdraw() # Hide the main window
screen_width = root.winfo_screenwidth()
screen_height = root.winfo_screenheight()
root.destroy()
return screen_width, screen_height
def update_circles(self):
# Copy the original image
img_copy = self.img.copy()
# Get the display dimensions
display_width, display_height = self.get_display_dimensions()
# Check if the image size exceeds the maximum allowable dimensions
if img_copy.shape[0] > display_height or img_copy.shape[1] > display_width:
# Calculate the aspect ratio
aspect_ratio = min(display_width / img_copy.shape[1], display_height / img_copy.shape[0])
# Resize the image to fit within the maximum allowable dimensions
img_copy = cv2.resize(img_copy, (int(img_copy.shape[1] * aspect_ratio), int(img_copy.shape[0] * aspect_ratio)))
# Draw the max circle, min circle, and display the scaled image
max_radius = int((self.max_d * self.scale_factor) * aspect_ratio)
cv2.circle(img_copy, (int(self.max_xc * aspect_ratio), int(self.yc * aspect_ratio)), max_radius, (0, 0, 255), -1)
min_radius = int((self.min_d * self.scale_factor) * aspect_ratio)
cv2.circle(img_copy, (int(self.min_xc * aspect_ratio), int(self.yc*aspect_ratio)), min_radius, (0, 255, 0), -1)
# Display the scaled image
cv2.imshow(
"Fixation circles preview (red = longer fixation; green = shorter fixation) - please press m to upscale, n to down-scale and q to save",
img_copy,
)
def on_minus_click(self):
self.scale_factor = max(0.001, self.scale_factor - 0.01)
self.update_circles()
def on_plus_click(self):
self.scale_factor = min(10, self.scale_factor + 0.01)
self.update_circles()
def on_save_click(self):
cv2.destroyAllWindows()
def run(self):
logging.info("Opening the preview window...")
try:
while True:
# Display the initial circles
self.update_circles()
key = cv2.waitKey(0)
if key == ord("q"):
self.on_save_click()
cv2.destroyAllWindows()
logging.info(f"Q was pressed. Closing the preview window")
break
elif key == ord("n"):
self.on_minus_click()
logging.info(
f"N was pressed. Down-scaling fixations preview: scale factor is {self.scale_factor}"
)
elif key == ord("m"):
self.on_plus_click()
logging.info(
f"M was pressed. Up-scaling fixations preview: scale factor is {self.scale_factor}"
)
except KeyboardInterrupt:
pass
finally:
cv2.destroyAllWindows()
return self.scale_factor
# Resize gaze/fixation & reference img for faster video processing later
def scale_coordinates(rim_data, filepaths, scale_factor):
gaze = rim_data["gaze.csv"][0]
ref_img_noresized_bgr = cv2.imread(str(filepaths["reference_image.jpeg"]))
ref_img_noresized = cv2.cvtColor(ref_img_noresized_bgr, cv2.COLOR_BGR2RGB)
gaze["fix x"] = gaze["fix x"] / 2
gaze["fix y"] = gaze["fix y"] / 2
gaze["fix d"] = gaze["fix d"] * scale_factor
gaze["gaze position in reference image x [px]"] = (
gaze["gaze position in reference image x [px]"] / 2
)
gaze["gaze position in reference image y [px]"] = (
gaze["gaze position in reference image y [px]"] / 2
)
height, width = (ref_img_noresized.shape[0], ref_img_noresized.shape[1])
ref_img = cv2.resize(
ref_img_noresized,
dsize=(int(width / 2), int(height / 2)),
interpolation=cv2.INTER_CUBIC,
)
rim_data["gaze.csv"] = []
rim_data["reference_image.jpeg"] = []
rim_data["gaze.csv"].append(gaze)
rim_data["reference_image.jpeg"].append(ref_img)
logging.info(
"Reference image, gaze and fixation coordinates scaled to speed up video processing"
)
# create and set path to folder to store (static) reference videos
def path_to_ref_video(rim_folder):
output_path = os.path.join(
rim_folder,
"ref_video",
)
if not os.path.exists(output_path):
os.makedirs(output_path)
logging.info("New folder created: ref_video")
else:
logging.info("ref_video folder already exists. Moving to the next step")
os.chdir(output_path)
return output_path
# create and set path to folder to store final scanpath videos
def path_to_scanpath_video(rim_folder):
output_path = os.path.join(
rim_folder,
"scanpath",
)
if not os.path.exists(output_path):
os.makedirs(output_path)
logging.info("New folder created: scanpath")
else:
logging.info("scanpath folder already exists. Moving to the next step")
os.chdir(output_path)
return output_path
# Create (static) ref videos to draw on each frame later
def create_ref_video(rim_data, rim_folder, subj):
path_to_ref_video(rim_folder)
image = rim_data["reference_image.jpeg"][0]
gaze = rim_data["gaze.csv"][0]
start = gaze[gaze["names"] == subj].iloc[0, 2]
end = gaze[gaze["names"] == subj].iloc[-1, 2]
count = gaze[gaze["names"] == subj]["section id"].shape[0]
time = (end - start) / 1e9 # convert microsec to sec
framerate = count / time
filename = f"{subj}_ref.mp4"
myclip = ImageClip(image)
myclip.set_duration(time).write_videofile(
filename,
fps=framerate,
verbose=False,
logger=None,
)
logging.info(f"Static reference video(s) {filename} created")
# generate random colors for scanpath
def color_generator(names):
colors = {
subj: (
random.randint(0, 255),
random.randint(0, 255),
random.randint(0, 255),
)
for subj in names
}
logging.info("Random colors generated")
return colors
@contextmanager
def capture_frame(ref_video_path):
capture = cv2.VideoCapture(ref_video_path)
try:
yield capture
finally:
capture.release()
class Writer_frame:
def __init__(self, scan_video_path, fourcc, cap):
self.scan_video_path = scan_video_path
self.fourcc = fourcc
self.cap = cap
def __enter__(self):
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.frame_size = (frame_width, frame_height)
self.out = cv2.VideoWriter(
self.scan_video_path, self.fourcc, self.fps, self.frame_size
)
return self.out
def __exit__(self, exc_type, exc_value, exc_traceback):
self.out.release()
# draw a semi-transparent filled circle to represent fixation duration
def draw_semitransp_fill_fixation_circle(frame, fixations, overlay, color, alpha):
for k in range(len(fixations["n"])):
cv2.circle(
img=overlay,
center=(fixations["x"][k], fixations["y"][k]),
radius=int(fixations["d"][k]),
color=color,
thickness=-1,
lineType=cv2.LINE_AA,
)
frame = cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0)
return frame
# draw the outline for the fixation circle
def draw_outline_fixation_circle(
frame,
fixations,
color,
):
for k in range(len(fixations["n"])):
cv2.circle(
img=frame,
center=(fixations["x"][k], fixations["y"][k]),
radius=int(fixations["d"][k]),
color=color,
thickness=3,
lineType=cv2.LINE_AA,
)
# draw scanpath lines connecting each fixation circles
def draw_scanpath_lines(frame, fixations, color):
for k in range(len(fixations["n"])):
# center coordinates of current fix
center = (int(fixations["x"][k]), int(fixations["y"][k]))
# center coordinates of previous fix
center_n1 = (int(fixations["x"][k - 1]), int(fixations["y"][k - 1]))
if k != 0:
cv2.line(img=frame, pt1=center, pt2=center_n1, color=color, thickness=3)
# draw fixation nr, black nr with white outline
def draw_fixation_nr(frame, fixations, font, font_size, font_thick_b, font_thick_w):
for k in range(len(fixations["n"])):
# write fix nr
center = (fixations["x"][k], fixations["y"][k])
text = fixations["n"][k]
text_size_b = cv2.getTextSize(f"{text}", font, font_size, font_thick_b)
text_origin = (
int(center[0] - text_size_b[0][0] / 2),
int(center[1] + text_size_b[0][1] / 2),
)
cv2.putText(
img=frame,
text=f"{text}",
org=text_origin,
fontFace=font,
fontScale=font_size,
color=(255, 255, 255),
thickness=font_thick_w,
lineType=cv2.LINE_AA,
)
cv2.putText(
img=frame,
text=f"{text}",
org=text_origin,
fontFace=font,
fontScale=font_size,
color=(0, 0, 0),
thickness=font_thick_b,
lineType=cv2.LINE_AA,
)
# draw legend with corresponding scanpath color and subject name
def draw_name_legend(frame, color, subj, font, font_size, font_thick_b, font_thick_w):
# legend position data
w = frame.shape[1]
r_end_point = (int(w - (w / 50)), int(w / 25))
r_start_point = (int(w - (w / 5)), int(w / 50))
ry = r_end_point[1] - r_start_point[1]
l_end_point = (
int(r_start_point[0] + ry * 2),
int(r_start_point[1] + ry / 2),
)
l_start_point = (
int(r_start_point[0] + ry / 2),
int(r_start_point[1] + ry / 2),
)
# draw legend
cv2.rectangle(
img=frame,
pt1=r_start_point,
pt2=r_end_point,
color=(255, 255, 255),
thickness=-1,
lineType=cv2.LINE_AA,
)
cv2.line(
img=frame,
pt1=l_start_point,
pt2=l_end_point,
color=color,
thickness=3,
lineType=cv2.LINE_AA,
)
text = subj
text_size_b = cv2.getTextSize(f"{text}", font, font_size, font_thick_b)
text_origin = (
int(l_end_point[0] + text_size_b[0][0] / 4),
int(l_end_point[1] + text_size_b[0][1] / 2),
)
cv2.putText(
img=frame,
text=f"{text}",
org=text_origin,
fontFace=font,
fontScale=font_size,
color=(0, 0, 0),
thickness=1,
lineType=cv2.LINE_AA,
)
# draw legend with corresponding scanpath color and subject name
def draw_all_names_legend(
frame, colors, names, font, font_size, font_thick_b, font_thick_w
):
# legend position data
nr_subj = len(names)
w = frame.shape[1]
r_end_point = (int(w - (w / 50)), int(w / 50) * nr_subj)
r_start_point = (int(w - (w / 5)), 0)
ry = int(w / 50) - r_start_point[1]
space = 0
# draw legend
cv2.rectangle(
img=frame,
pt1=r_start_point,
pt2=r_end_point,
color=(255, 255, 255),
thickness=-1,
lineType=cv2.LINE_AA,
)
for subj in names:
l_end_point = (
int(r_start_point[0] + ry * 2),
int(r_start_point[1] + ry / 2 + space),
)
l_start_point = (
int(r_start_point[0] + ry / 2),
int(r_start_point[1] + ry / 2 + space),
)
cv2.line(
img=frame,
pt1=l_start_point,
pt2=l_end_point,
color=colors[subj],
thickness=3,
lineType=cv2.LINE_AA,
)
text = subj
text_size_b = cv2.getTextSize(f"{text}", font, font_size, font_thick_b)
text_origin = (
int(l_end_point[0] + text_size_b[0][0] / 4),
int(l_end_point[1] + text_size_b[0][1] / 2),
)
cv2.putText(
img=frame,
text=f"{text}",
org=text_origin,
fontFace=font,
fontScale=font_size,
color=(0, 0, 0),
thickness=1,
lineType=cv2.LINE_AA,
)
space = space + ry
# draw red gaze overlay
def draw_gaze_overlay(frame, xg, yg, i):
cv2.circle(
img=frame,
center=(int(xg[i]), int(yg[i])),
radius=int(50),
color=(0, 0, 255),
thickness=int(10),
)
# main function to extract each frame from reference video, draw scanpath and gaze overlay, and then store each processed frame into a list
def draw_on_frame(
rim_data, ref_video_path, scan_video_path, subj, colors, fixations_df, path_to_scanpath
):
# text aesthetics
font = cv2.FONT_HERSHEY_DUPLEX
font_size = 1
font_thick_w = 3
font_thick_b = 1
# useful dicts to store scanpath x/y across frames
fixations = {} # fixation storage
fixations["x"] = [] # fix x
fixations["y"] = [] # fix y
fixations["n"] = [] # fix nr
fixations["d"] = [] # fix duration
fixations["refimg"] = [] # fix detected in ref img
# load processed gaze.csv
gaze = rim_data["gaze.csv"][0]
gaze = gaze[gaze["names"] == subj].reset_index(drop=True)
xg = gaze.iloc[:, 4]
yg = gaze.iloc[:, 5]
# store frames nr
frame_nr = gaze[gaze["names"] == subj]["section id"].shape[0]
progress_bar = tqdm(total=frame_nr)
# store last fix id
last_fix_id = int(np.nanmax(gaze[(gaze["names"] == subj) & (~gaze["fix x"].isna())]["fixation id"]))
# flag to track if the scanpath image has already been saved
#scanpath_saved = False
i = 0
with capture_frame(ref_video_path) as cap, Writer_frame(
scan_video_path, cv2.VideoWriter_fourcc(*"mp4v"), cap
) as scanpath_video:
while True:
ret, frame = cap.read()
if not ret:
break
# create a copy of the frame, to use it as overlay for semi-transparent filled circles
overlay = frame.copy()
# a fixation is detected, and it's on the reference image
if (
not math.isnan(gaze[gaze["names"] == subj].iloc[i, 6])
and gaze[gaze["names"] == subj].iloc[i, 11]
):
x, y, n, d, refimg = (
gaze[gaze["names"] == subj].iloc[i, 8],
gaze[gaze["names"] == subj].iloc[i, 9],
gaze[gaze["names"] == subj].iloc[i, 6],
gaze[gaze["names"] == subj].iloc[i, 10],
gaze[gaze["names"] == subj].iloc[i, 11],
)
fixations["x"].append(int(x))
fixations["y"].append(int(y))
fixations["n"].append(int(n))
fixations["d"].append(int(d))
fixations["refimg"].append(refimg)
frame = draw_semitransp_fill_fixation_circle(
frame, fixations, overlay, colors[subj], 0.5
)
draw_outline_fixation_circle(frame, fixations, colors[subj])
draw_scanpath_lines(
frame,
fixations,
colors[subj],
)
draw_fixation_nr(
frame, fixations, font, font_size, font_thick_b, font_thick_w
)
draw_name_legend(
frame,
colors[subj],
subj,
font,
font_size,
font_thick_b,
font_thick_w,
)
# Save the scanpath when last fix is drawn, before gaze overlay
save_scanpath_image(fixations, frame, subj, last_fix_id, path_to_scanpath)
# gaze overlay, only when gaze was detected on reference image
if not math.isnan(xg[i]):
draw_gaze_overlay(frame, xg, yg, i)
else:
pass
# write drawn frame
scanpath_video.write(frame)
progress_bar.set_description(f"Processing {subj}_scanpath.mp4")
progress_bar.update(1)
i += 1
if i == frame_nr:
store_fixations_for_all_subjects(fixations_df, fixations, subj)
logging.info(f"{subj}_scanpath.mp4 saved!")
break
# Create and save scanpath image
def save_scanpath_image(fixations, frame, subj, last_fix_id, path_to_scanpath):
if fixations["n"]:
if fixations["n"][-1] == last_fix_id:
if not os.path.exists(os.path.join(path_to_scanpath,f"{subj}_scanpath.jpeg")):
cv2.imwrite(f"{subj}_scanpath.jpeg", frame)
logging.info(f"{subj}_scanpath.jpeg saved!")
def store_fixations_for_all_subjects(fixations_df, fixations, subj):
fixations_df[subj] = []
fixations_df[subj].append(fixations)
def create_aggregated_scanpaths(fixations_df, colors, rim_data, names):
# text aesthetics
font = cv2.FONT_HERSHEY_DUPLEX
font_size = 1
font_thick_w = 3
font_thick_b = 1
# load reference image
ref_img = rim_data["reference_image.jpeg"][0]
ref_img = cv2.cvtColor(ref_img, cv2.COLOR_BGR2RGB)
for subj in names:
overlay = ref_img.copy()
ref_img = draw_semitransp_fill_fixation_circle(
ref_img, fixations_df[subj][0], overlay, colors[subj], 0.5
)
draw_outline_fixation_circle(ref_img, fixations_df[subj][0], colors[subj])
draw_scanpath_lines(ref_img, fixations_df[subj][0], colors[subj])
draw_all_names_legend(
ref_img, colors, names, font, font_size, font_thick_b, font_thick_w
)
cv2.imwrite("general_scanpath.jpeg", ref_img)
logging.info("general_scanpath.jpeg saved!")
def remove_ref_video_folder(rim_folder):
shutil.rmtree(path_to_ref_video(rim_folder))
def main():
rim_folder = select_folder()
filepaths = paths_to_rim_files(rim_folder)
scan_recording_id = check_sections_csv(filepaths)
rim_data = reset_fixation_id(filepaths)
set_name(rim_data, filepaths)
add_fix_xyd(rim_data)
scaler = CircleScaler(
str(filepaths["reference_image.jpeg"]),
rim_data,
)
scale_factor = scaler.run()
scale_coordinates(rim_data, filepaths, scale_factor)
fixations = rim_data["fixations.csv"][0]
unique_names = fixations['names'].unique()
names = []
# Check only recordings were fixations were detected in the reference image
for name in unique_names:
# Filter the DataFrame to include only rows with the current name
name_df = fixations[fixations['names'] == name]
# Check if 'fixation detected in reference image' has more than one unique value
if name_df['fixation detected in reference image'].nunique() == 1:
# Check if all rows are not equal to 'false'
if not (name_df['fixation detected in reference image'] == False).all():
names.append(name)
else:
# Just append the name if there are two unique value (no need to check)
names.append(name)
ids = rim_data["fixations.csv"][0]["recording id"].unique()
paired = list(zip(names, ids))
for pair in paired:
if scan_recording_id in pair:
names = names[names != pair[0]]
colors = color_generator(names)
path_to_scanpath = path_to_scanpath_video(rim_folder)
fixations_df = {}
for subj in names:
create_ref_video(rim_data, rim_folder, subj)
ref_video_path = os.path.join(
path_to_ref_video(rim_folder),
f"{subj}_ref.mp4",
)
scan_video_path = os.path.join(
path_to_scanpath_video(rim_folder),
f"{subj}_scanpath.mp4",
)
draw_on_frame(
rim_data,
ref_video_path,
scan_video_path,
subj,
colors,
fixations_df,
path_to_scanpath,
)
if len(names) > 1:
create_aggregated_scanpaths(fixations_df, colors, rim_data, names)
remove_ref_video_folder(rim_folder)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment