|
import os |
|
import cv2 |
|
import math |
|
import random |
|
import shutil |
|
import logging |
|
import numpy as np |
|
import pandas as pd |
|
import tkinter as tk |
|
from tqdm import tqdm |
|
from tkinter import ttk |
|
from pathlib import Path |
|
from tkinter import messagebox |
|
from tkinter import filedialog |
|
from rich.logging import RichHandler |
|
from moviepy.editor import ImageClip |
|
from contextlib import contextmanager |
|
|
|
logger = logging.getLogger() |
|
logging.basicConfig( |
|
format="%(message)s", datefmt="[%X]", level=logging.INFO, handlers=[RichHandler()] |
|
) |
|
|
|
|
|
# select the RIM export folder |
|
def select_folder(): |
|
root = tk.Tk() |
|
root.withdraw() |
|
rim_folder = filedialog.askdirectory() |
|
required_files = [ |
|
"fixations.csv", |
|
"gaze.csv", |
|
"reference_image.jpeg", |
|
"sections.csv", |
|
"enrichment_info.txt", |
|
] |
|
if not all( |
|
os.path.isfile(os.path.join(rim_folder, file)) for file in required_files |
|
): |
|
raise Exception( |
|
"Wrong folder! Please select a Reference Image Mapper export folder" |
|
) |
|
logging.info(f"RIM export folder: {rim_folder}") |
|
|
|
return rim_folder |
|
|
|
|
|
# set paths to files from RIM export folder |
|
def paths_to_rim_files(rim_folder): |
|
files_to_include = [ |
|
"fixations.csv", |
|
"sections.csv", |
|
"gaze.csv", |
|
"reference_image.jpeg", |
|
] |
|
filepaths = { |
|
file_name: Path(rim_folder) / file_name for file_name in files_to_include |
|
} |
|
|
|
return filepaths |
|
|
|
|
|
import tkinter as tk |
|
|
|
# Initialize the selected_id as None by default |
|
selected_id = None |
|
|
|
# check if the scanning recording id is included among the wearers to exclude it from the scanpath generation |
|
def check_sections_csv(filepaths): |
|
# Load sections.csv as a pandas DataFrame |
|
df = pd.read_csv(filepaths["sections.csv"]) |
|
|
|
# Check if there is more than one row for the "recording id" column |
|
if df["recording id"].nunique() > 1: |
|
# Check if the "start recording event" column has "recording.begin" |
|
if "recording.begin" in df["start event name"].unique(): |
|
# Check if the "end recording event" column has "recording.end" |
|
if "recording.end" in df["end event name"].unique(): |
|
# Create a GUI to ask if the scanning recording should be included |
|
root = tk.Tk() |
|
root.title("Include Scanning Recording?") |
|
|
|
def include_scanning(): |
|
root.quit() |
|
global selected_id |
|
selected_id = True # Set selected_id to True if 'Yes' is pressed |
|
|
|
def exclude_scanning(): |
|
root.quit() |
|
global selected_id |
|
selected_id = False # Set selected_id to False if 'No' is pressed |
|
|
|
# Create a frame for the message box |
|
frame = tk.Frame(root) |
|
frame.pack(padx=20, pady=10) |
|
|
|
# Display a message label |
|
message_label = tk.Label( |
|
frame, |
|
text="Is the scanning recording included in the following list?" |
|
) |
|
message_label.pack() |
|
|
|
# Add a listbox to display available recording names |
|
row_listbox = tk.Listbox(frame, width=100) |
|
|
|
# Insert the names into the listbox |
|
for row in df.itertuples(): |
|
row_listbox.insert( |
|
tk.END, |
|
f"Id: {row[2]} | Recording name: {row[3]} | Wearer: {row[5]}", |
|
) |
|
|
|
row_listbox.pack() |
|
|
|
# Add 'Yes' and 'No' buttons |
|
yes_button = tk.Button(frame, text="Yes", command=include_scanning) |
|
no_button = tk.Button(frame, text="No", command=exclude_scanning) |
|
|
|
yes_button.pack(side="left", padx=10) |
|
no_button.pack(side="left", padx=10) |
|
|
|
root.mainloop() |
|
root.destroy() |
|
|
|
# Return the selected_id value or None if no conditions are met |
|
return selected_id |
|
|
|
# change fixation nr to start from 1 for each subject |
|
def reset_fixation_id(filepaths): |
|
rim_data = {} |
|
files_to_process = { |
|
file_name: pd.read_csv(filepaths[file_name]) |
|
for file_name in filepaths |
|
if file_name.endswith(".csv") |
|
if file_name != "sections.csv" |
|
} |
|
for file_name in files_to_process: |
|
data = files_to_process[file_name] |
|
# Create a new column 'fixation id new' |
|
data["fixation id new"] = data["fixation id"] |
|
# Get unique section ids |
|
unique_section_ids = data["section id"].unique() |
|
|
|
for section_id in unique_section_ids: |
|
# Get the subset of rows for the current section_id |
|
section_rows = data[data["section id"] == section_id] |
|
# Get the minimum value of 'fixation id' for the current section |
|
min_fixation_id = section_rows["fixation id"].min() |
|
# Subtract min 'fixation id' from all in 'fixation id new' column |
|
data.loc[data["section id"] == section_id, "fixation id new"] = ( |
|
data.loc[data["section id"] == section_id, "fixation id new"] |
|
- min_fixation_id |
|
+ 1 |
|
) |
|
|
|
# drop the old fixation id column |
|
data.drop(columns=["fixation id"], inplace=True) |
|
# rename the new fixation id column |
|
data.rename(columns={"fixation id new": "fixation id"}, inplace=True) |
|
rim_data[file_name] = [] |
|
rim_data[file_name].append(data) |
|
|
|
logging.info("Set fixation nr to start from 1") |
|
return rim_data |
|
|
|
|
|
# add subject name to data |
|
def set_name(rim_data, filepaths): |
|
sections = pd.read_csv(filepaths["sections.csv"]) |
|
#names = list(sections["wearer name"].unique()) |
|
items_to_include = ["fixations.csv", "gaze.csv"] |
|
|
|
# check for any duplicates |
|
duplicates = sections.duplicated(subset=["wearer name"], keep=False) |
|
if any(duplicates): |
|
counts = {} |
|
|
|
for i, row in sections.iterrows(): |
|
name = row["wearer name"] |
|
|
|
# If the name is a duplicate, add an index to it |
|
if duplicates[i]: |
|
if name not in counts: |
|
counts[name] = 1 |
|
else: |
|
counts[name] += 1 |
|
sections.at[i, "wearer name"] = f"{name}_{counts[name]}" |
|
|
|
# Save the updated DataFrame to the CSV file |
|
sections.to_csv(filepaths["sections.csv"], index=False) |
|
|
|
names = list(sections["wearer name"].unique()) |
|
for file in rim_data: |
|
if file in items_to_include: |
|
data = rim_data[file][0] |
|
# Get unique section ids |
|
unique_section_ids = data["section id"].unique() |
|
for idx, name in zip(unique_section_ids, names): |
|
# set fix id to start from 1 in gaze.csv and fixation.csv |
|
data.loc[data["section id"] == idx, ["names"]] = name |
|
rim_data[file] = [] |
|
rim_data[file].append(data) |
|
logging.info("Wearer names added to fixation and gaze files") |
|
|
|
|
|
# add fixation x, y, duration and detected on ref img to gaze.csv |
|
def add_fix_xyd(rim_data): |
|
gaze = rim_data["gaze.csv"][0] |
|
fixations = rim_data["fixations.csv"][0] |
|
# Add the 'fix x' column to the gaze DataFrame with null values |
|
gaze["fix x"] = None |
|
gaze["fix y"] = None |
|
gaze["fix d"] = None |
|
gaze["fix in refimg"] = None |
|
# Iterate over the rows of gaze |
|
for i, gaze_row in gaze.iterrows(): |
|
# Get value of 'section id' and 'fixation id' for current row |
|
section_id, fixation_id = gaze_row[["section id", "fixation id"]] |
|
# Filter fixations |
|
filtered_fixations = fixations[ |
|
(fixations["section id"] == section_id) |
|
& (fixations["fixation id"] == fixation_id) |
|
] |
|
# If any matching rows, get the value of the 'fixation x [px]' column |
|
if filtered_fixations.shape[0] > 0: |
|
fix_x = filtered_fixations.iloc[0]["fixation x [px]"] |
|
fix_y = filtered_fixations.iloc[0]["fixation y [px]"] |
|
fix_d = filtered_fixations.iloc[0]["duration [ms]"] |
|
fix_inrefimg = filtered_fixations.iloc[0]["fixation detected in reference image"] |
|
# Fill 'fix x' column of gaze row with 'fixation x [px]' value |
|
gaze.at[i, "fix x"] = fix_x |
|
gaze.at[i, "fix y"] = fix_y |
|
gaze.at[i, "fix d"] = fix_d |
|
gaze.at[i, "fix in refimg"] = fix_inrefimg |
|
|
|
rim_data["gaze.csv"] = [] |
|
rim_data["fixations.csv"] = [] |
|
rim_data["gaze.csv"].append(gaze) |
|
rim_data["fixations.csv"].append(fixations) |
|
|
|
logging.info("Fixations info merged to gaze file") |
|
|
|
""" |
|
CircleScaler is a class for interactively visualizing fixation scanpaths over a reference image. |
|
|
|
This class allows users to scale and display fixation circles representing fixation durations on the |
|
reference image. It provides options to increase or decrease the scale factor, and it displays two fixation circles: |
|
one in red for longer fixations and another in green for shorter fixations. Users can interact with the |
|
preview window to adjust the scale and visualize the impact on the fixation circles. |
|
""" |
|
|
|
class CircleScaler: |
|
def __init__(self, img_path, rim_data): |
|
self.img = cv2.imread(img_path) |
|
self.img_height = self.img.shape[0] |
|
self.img_width = self.img.shape[1] |
|
self.max_xc = int((self.img_width / 4) * 3) |
|
self.min_xc = int(self.img_width / 4) |
|
self.yc = int(self.img_height / 2) |
|
self.gaze = rim_data["gaze.csv"][0] |
|
self.max_d = self.gaze["fix d"].max() |
|
self.min_d = self.gaze["fix d"].min() |
|
self.scale_factor = 1.0 |
|
self.update_circles() |
|
|
|
def __exit__(self, exc_type, exc_value, exc_traceback): |
|
cv2.destroyAllWindows() |
|
|
|
def get_display_dimensions(self): |
|
root = tk.Tk() |
|
root.withdraw() # Hide the main window |
|
|
|
screen_width = root.winfo_screenwidth() |
|
screen_height = root.winfo_screenheight() |
|
|
|
root.destroy() |
|
|
|
return screen_width, screen_height |
|
|
|
def update_circles(self): |
|
# Copy the original image |
|
img_copy = self.img.copy() |
|
|
|
# Get the display dimensions |
|
display_width, display_height = self.get_display_dimensions() |
|
|
|
# Check if the image size exceeds the maximum allowable dimensions |
|
if img_copy.shape[0] > display_height or img_copy.shape[1] > display_width: |
|
# Calculate the aspect ratio |
|
aspect_ratio = min(display_width / img_copy.shape[1], display_height / img_copy.shape[0]) |
|
|
|
# Resize the image to fit within the maximum allowable dimensions |
|
img_copy = cv2.resize(img_copy, (int(img_copy.shape[1] * aspect_ratio), int(img_copy.shape[0] * aspect_ratio))) |
|
|
|
# Draw the max circle, min circle, and display the scaled image |
|
max_radius = int((self.max_d * self.scale_factor) * aspect_ratio) |
|
cv2.circle(img_copy, (int(self.max_xc * aspect_ratio), int(self.yc * aspect_ratio)), max_radius, (0, 0, 255), -1) |
|
|
|
|
|
min_radius = int((self.min_d * self.scale_factor) * aspect_ratio) |
|
cv2.circle(img_copy, (int(self.min_xc * aspect_ratio), int(self.yc*aspect_ratio)), min_radius, (0, 255, 0), -1) |
|
|
|
# Display the scaled image |
|
cv2.imshow( |
|
"Fixation circles preview (red = longer fixation; green = shorter fixation) - please press m to upscale, n to down-scale and q to save", |
|
img_copy, |
|
) |
|
|
|
def on_minus_click(self): |
|
self.scale_factor = max(0.001, self.scale_factor - 0.01) |
|
self.update_circles() |
|
|
|
def on_plus_click(self): |
|
self.scale_factor = min(10, self.scale_factor + 0.01) |
|
self.update_circles() |
|
|
|
def on_save_click(self): |
|
cv2.destroyAllWindows() |
|
|
|
def run(self): |
|
logging.info("Opening the preview window...") |
|
try: |
|
while True: |
|
# Display the initial circles |
|
self.update_circles() |
|
|
|
key = cv2.waitKey(0) |
|
if key == ord("q"): |
|
self.on_save_click() |
|
cv2.destroyAllWindows() |
|
logging.info(f"Q was pressed. Closing the preview window") |
|
break |
|
elif key == ord("n"): |
|
self.on_minus_click() |
|
logging.info( |
|
f"N was pressed. Down-scaling fixations preview: scale factor is {self.scale_factor}" |
|
) |
|
elif key == ord("m"): |
|
self.on_plus_click() |
|
logging.info( |
|
f"M was pressed. Up-scaling fixations preview: scale factor is {self.scale_factor}" |
|
) |
|
except KeyboardInterrupt: |
|
pass |
|
|
|
finally: |
|
cv2.destroyAllWindows() |
|
|
|
return self.scale_factor |
|
|
|
|
|
# Resize gaze/fixation & reference img for faster video processing later |
|
def scale_coordinates(rim_data, filepaths, scale_factor): |
|
gaze = rim_data["gaze.csv"][0] |
|
ref_img_noresized_bgr = cv2.imread(str(filepaths["reference_image.jpeg"])) |
|
ref_img_noresized = cv2.cvtColor(ref_img_noresized_bgr, cv2.COLOR_BGR2RGB) |
|
|
|
gaze["fix x"] = gaze["fix x"] / 2 |
|
gaze["fix y"] = gaze["fix y"] / 2 |
|
gaze["fix d"] = gaze["fix d"] * scale_factor |
|
gaze["gaze position in reference image x [px]"] = ( |
|
gaze["gaze position in reference image x [px]"] / 2 |
|
) |
|
gaze["gaze position in reference image y [px]"] = ( |
|
gaze["gaze position in reference image y [px]"] / 2 |
|
) |
|
height, width = (ref_img_noresized.shape[0], ref_img_noresized.shape[1]) |
|
ref_img = cv2.resize( |
|
ref_img_noresized, |
|
dsize=(int(width / 2), int(height / 2)), |
|
interpolation=cv2.INTER_CUBIC, |
|
) |
|
|
|
rim_data["gaze.csv"] = [] |
|
rim_data["reference_image.jpeg"] = [] |
|
rim_data["gaze.csv"].append(gaze) |
|
rim_data["reference_image.jpeg"].append(ref_img) |
|
|
|
logging.info( |
|
"Reference image, gaze and fixation coordinates scaled to speed up video processing" |
|
) |
|
|
|
|
|
# create and set path to folder to store (static) reference videos |
|
def path_to_ref_video(rim_folder): |
|
output_path = os.path.join( |
|
rim_folder, |
|
"ref_video", |
|
) |
|
if not os.path.exists(output_path): |
|
os.makedirs(output_path) |
|
logging.info("New folder created: ref_video") |
|
else: |
|
logging.info("ref_video folder already exists. Moving to the next step") |
|
os.chdir(output_path) |
|
|
|
return output_path |
|
|
|
|
|
# create and set path to folder to store final scanpath videos |
|
def path_to_scanpath_video(rim_folder): |
|
output_path = os.path.join( |
|
rim_folder, |
|
"scanpath", |
|
) |
|
if not os.path.exists(output_path): |
|
os.makedirs(output_path) |
|
logging.info("New folder created: scanpath") |
|
else: |
|
logging.info("scanpath folder already exists. Moving to the next step") |
|
os.chdir(output_path) |
|
return output_path |
|
|
|
|
|
# Create (static) ref videos to draw on each frame later |
|
def create_ref_video(rim_data, rim_folder, subj): |
|
path_to_ref_video(rim_folder) |
|
image = rim_data["reference_image.jpeg"][0] |
|
gaze = rim_data["gaze.csv"][0] |
|
start = gaze[gaze["names"] == subj].iloc[0, 2] |
|
end = gaze[gaze["names"] == subj].iloc[-1, 2] |
|
count = gaze[gaze["names"] == subj]["section id"].shape[0] |
|
time = (end - start) / 1e9 # convert microsec to sec |
|
framerate = count / time |
|
filename = f"{subj}_ref.mp4" |
|
myclip = ImageClip(image) |
|
myclip.set_duration(time).write_videofile( |
|
filename, |
|
fps=framerate, |
|
verbose=False, |
|
logger=None, |
|
) |
|
|
|
logging.info(f"Static reference video(s) {filename} created") |
|
|
|
|
|
# generate random colors for scanpath |
|
def color_generator(names): |
|
colors = { |
|
subj: ( |
|
random.randint(0, 255), |
|
random.randint(0, 255), |
|
random.randint(0, 255), |
|
) |
|
for subj in names |
|
} |
|
logging.info("Random colors generated") |
|
return colors |
|
|
|
|
|
@contextmanager |
|
def capture_frame(ref_video_path): |
|
capture = cv2.VideoCapture(ref_video_path) |
|
try: |
|
yield capture |
|
finally: |
|
capture.release() |
|
|
|
|
|
class Writer_frame: |
|
def __init__(self, scan_video_path, fourcc, cap): |
|
self.scan_video_path = scan_video_path |
|
self.fourcc = fourcc |
|
self.cap = cap |
|
|
|
def __enter__(self): |
|
self.fps = self.cap.get(cv2.CAP_PROP_FPS) |
|
frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
self.frame_size = (frame_width, frame_height) |
|
self.out = cv2.VideoWriter( |
|
self.scan_video_path, self.fourcc, self.fps, self.frame_size |
|
) |
|
return self.out |
|
|
|
def __exit__(self, exc_type, exc_value, exc_traceback): |
|
self.out.release() |
|
|
|
|
|
# draw a semi-transparent filled circle to represent fixation duration |
|
def draw_semitransp_fill_fixation_circle(frame, fixations, overlay, color, alpha): |
|
for k in range(len(fixations["n"])): |
|
cv2.circle( |
|
img=overlay, |
|
center=(fixations["x"][k], fixations["y"][k]), |
|
radius=int(fixations["d"][k]), |
|
color=color, |
|
thickness=-1, |
|
lineType=cv2.LINE_AA, |
|
) |
|
frame = cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0) |
|
|
|
return frame |
|
|
|
|
|
# draw the outline for the fixation circle |
|
def draw_outline_fixation_circle( |
|
frame, |
|
fixations, |
|
color, |
|
): |
|
for k in range(len(fixations["n"])): |
|
cv2.circle( |
|
img=frame, |
|
center=(fixations["x"][k], fixations["y"][k]), |
|
radius=int(fixations["d"][k]), |
|
color=color, |
|
thickness=3, |
|
lineType=cv2.LINE_AA, |
|
) |
|
|
|
|
|
# draw scanpath lines connecting each fixation circles |
|
def draw_scanpath_lines(frame, fixations, color): |
|
for k in range(len(fixations["n"])): |
|
# center coordinates of current fix |
|
center = (int(fixations["x"][k]), int(fixations["y"][k])) |
|
# center coordinates of previous fix |
|
center_n1 = (int(fixations["x"][k - 1]), int(fixations["y"][k - 1])) |
|
if k != 0: |
|
cv2.line(img=frame, pt1=center, pt2=center_n1, color=color, thickness=3) |
|
|
|
|
|
# draw fixation nr, black nr with white outline |
|
def draw_fixation_nr(frame, fixations, font, font_size, font_thick_b, font_thick_w): |
|
for k in range(len(fixations["n"])): |
|
# write fix nr |
|
center = (fixations["x"][k], fixations["y"][k]) |
|
text = fixations["n"][k] |
|
text_size_b = cv2.getTextSize(f"{text}", font, font_size, font_thick_b) |
|
text_origin = ( |
|
int(center[0] - text_size_b[0][0] / 2), |
|
int(center[1] + text_size_b[0][1] / 2), |
|
) |
|
cv2.putText( |
|
img=frame, |
|
text=f"{text}", |
|
org=text_origin, |
|
fontFace=font, |
|
fontScale=font_size, |
|
color=(255, 255, 255), |
|
thickness=font_thick_w, |
|
lineType=cv2.LINE_AA, |
|
) |
|
cv2.putText( |
|
img=frame, |
|
text=f"{text}", |
|
org=text_origin, |
|
fontFace=font, |
|
fontScale=font_size, |
|
color=(0, 0, 0), |
|
thickness=font_thick_b, |
|
lineType=cv2.LINE_AA, |
|
) |
|
|
|
|
|
# draw legend with corresponding scanpath color and subject name |
|
def draw_name_legend(frame, color, subj, font, font_size, font_thick_b, font_thick_w): |
|
# legend position data |
|
w = frame.shape[1] |
|
r_end_point = (int(w - (w / 50)), int(w / 25)) |
|
r_start_point = (int(w - (w / 5)), int(w / 50)) |
|
ry = r_end_point[1] - r_start_point[1] |
|
l_end_point = ( |
|
int(r_start_point[0] + ry * 2), |
|
int(r_start_point[1] + ry / 2), |
|
) |
|
l_start_point = ( |
|
int(r_start_point[0] + ry / 2), |
|
int(r_start_point[1] + ry / 2), |
|
) |
|
|
|
# draw legend |
|
cv2.rectangle( |
|
img=frame, |
|
pt1=r_start_point, |
|
pt2=r_end_point, |
|
color=(255, 255, 255), |
|
thickness=-1, |
|
lineType=cv2.LINE_AA, |
|
) |
|
cv2.line( |
|
img=frame, |
|
pt1=l_start_point, |
|
pt2=l_end_point, |
|
color=color, |
|
thickness=3, |
|
lineType=cv2.LINE_AA, |
|
) |
|
text = subj |
|
text_size_b = cv2.getTextSize(f"{text}", font, font_size, font_thick_b) |
|
text_origin = ( |
|
int(l_end_point[0] + text_size_b[0][0] / 4), |
|
int(l_end_point[1] + text_size_b[0][1] / 2), |
|
) |
|
cv2.putText( |
|
img=frame, |
|
text=f"{text}", |
|
org=text_origin, |
|
fontFace=font, |
|
fontScale=font_size, |
|
color=(0, 0, 0), |
|
thickness=1, |
|
lineType=cv2.LINE_AA, |
|
) |
|
|
|
|
|
# draw legend with corresponding scanpath color and subject name |
|
def draw_all_names_legend( |
|
frame, colors, names, font, font_size, font_thick_b, font_thick_w |
|
): |
|
# legend position data |
|
nr_subj = len(names) |
|
w = frame.shape[1] |
|
r_end_point = (int(w - (w / 50)), int(w / 50) * nr_subj) |
|
r_start_point = (int(w - (w / 5)), 0) |
|
ry = int(w / 50) - r_start_point[1] |
|
space = 0 |
|
# draw legend |
|
cv2.rectangle( |
|
img=frame, |
|
pt1=r_start_point, |
|
pt2=r_end_point, |
|
color=(255, 255, 255), |
|
thickness=-1, |
|
lineType=cv2.LINE_AA, |
|
) |
|
for subj in names: |
|
l_end_point = ( |
|
int(r_start_point[0] + ry * 2), |
|
int(r_start_point[1] + ry / 2 + space), |
|
) |
|
l_start_point = ( |
|
int(r_start_point[0] + ry / 2), |
|
int(r_start_point[1] + ry / 2 + space), |
|
) |
|
cv2.line( |
|
img=frame, |
|
pt1=l_start_point, |
|
pt2=l_end_point, |
|
color=colors[subj], |
|
thickness=3, |
|
lineType=cv2.LINE_AA, |
|
) |
|
text = subj |
|
text_size_b = cv2.getTextSize(f"{text}", font, font_size, font_thick_b) |
|
text_origin = ( |
|
int(l_end_point[0] + text_size_b[0][0] / 4), |
|
int(l_end_point[1] + text_size_b[0][1] / 2), |
|
) |
|
cv2.putText( |
|
img=frame, |
|
text=f"{text}", |
|
org=text_origin, |
|
fontFace=font, |
|
fontScale=font_size, |
|
color=(0, 0, 0), |
|
thickness=1, |
|
lineType=cv2.LINE_AA, |
|
) |
|
space = space + ry |
|
|
|
|
|
# draw red gaze overlay |
|
def draw_gaze_overlay(frame, xg, yg, i): |
|
cv2.circle( |
|
img=frame, |
|
center=(int(xg[i]), int(yg[i])), |
|
radius=int(50), |
|
color=(0, 0, 255), |
|
thickness=int(10), |
|
) |
|
|
|
|
|
# main function to extract each frame from reference video, draw scanpath and gaze overlay, and then store each processed frame into a list |
|
def draw_on_frame( |
|
rim_data, ref_video_path, scan_video_path, subj, colors, fixations_df, path_to_scanpath |
|
): |
|
# text aesthetics |
|
font = cv2.FONT_HERSHEY_DUPLEX |
|
font_size = 1 |
|
font_thick_w = 3 |
|
font_thick_b = 1 |
|
|
|
# useful dicts to store scanpath x/y across frames |
|
fixations = {} # fixation storage |
|
fixations["x"] = [] # fix x |
|
fixations["y"] = [] # fix y |
|
fixations["n"] = [] # fix nr |
|
fixations["d"] = [] # fix duration |
|
fixations["refimg"] = [] # fix detected in ref img |
|
|
|
# load processed gaze.csv |
|
gaze = rim_data["gaze.csv"][0] |
|
gaze = gaze[gaze["names"] == subj].reset_index(drop=True) |
|
xg = gaze.iloc[:, 4] |
|
yg = gaze.iloc[:, 5] |
|
|
|
# store frames nr |
|
frame_nr = gaze[gaze["names"] == subj]["section id"].shape[0] |
|
progress_bar = tqdm(total=frame_nr) |
|
# store last fix id |
|
last_fix_id = int(np.nanmax(gaze[(gaze["names"] == subj) & (~gaze["fix x"].isna())]["fixation id"])) |
|
|
|
# flag to track if the scanpath image has already been saved |
|
#scanpath_saved = False |
|
|
|
i = 0 |
|
|
|
with capture_frame(ref_video_path) as cap, Writer_frame( |
|
scan_video_path, cv2.VideoWriter_fourcc(*"mp4v"), cap |
|
) as scanpath_video: |
|
while True: |
|
ret, frame = cap.read() |
|
|
|
if not ret: |
|
break |
|
# create a copy of the frame, to use it as overlay for semi-transparent filled circles |
|
overlay = frame.copy() |
|
|
|
# a fixation is detected, and it's on the reference image |
|
if ( |
|
not math.isnan(gaze[gaze["names"] == subj].iloc[i, 6]) |
|
and gaze[gaze["names"] == subj].iloc[i, 11] |
|
): |
|
x, y, n, d, refimg = ( |
|
gaze[gaze["names"] == subj].iloc[i, 8], |
|
gaze[gaze["names"] == subj].iloc[i, 9], |
|
gaze[gaze["names"] == subj].iloc[i, 6], |
|
gaze[gaze["names"] == subj].iloc[i, 10], |
|
gaze[gaze["names"] == subj].iloc[i, 11], |
|
) |
|
fixations["x"].append(int(x)) |
|
fixations["y"].append(int(y)) |
|
fixations["n"].append(int(n)) |
|
fixations["d"].append(int(d)) |
|
fixations["refimg"].append(refimg) |
|
|
|
frame = draw_semitransp_fill_fixation_circle( |
|
frame, fixations, overlay, colors[subj], 0.5 |
|
) |
|
draw_outline_fixation_circle(frame, fixations, colors[subj]) |
|
draw_scanpath_lines( |
|
frame, |
|
fixations, |
|
colors[subj], |
|
) |
|
draw_fixation_nr( |
|
frame, fixations, font, font_size, font_thick_b, font_thick_w |
|
) |
|
draw_name_legend( |
|
frame, |
|
colors[subj], |
|
subj, |
|
font, |
|
font_size, |
|
font_thick_b, |
|
font_thick_w, |
|
) |
|
# Save the scanpath when last fix is drawn, before gaze overlay |
|
save_scanpath_image(fixations, frame, subj, last_fix_id, path_to_scanpath) |
|
# gaze overlay, only when gaze was detected on reference image |
|
if not math.isnan(xg[i]): |
|
draw_gaze_overlay(frame, xg, yg, i) |
|
|
|
else: |
|
pass |
|
|
|
# write drawn frame |
|
scanpath_video.write(frame) |
|
progress_bar.set_description(f"Processing {subj}_scanpath.mp4") |
|
progress_bar.update(1) |
|
i += 1 |
|
|
|
if i == frame_nr: |
|
store_fixations_for_all_subjects(fixations_df, fixations, subj) |
|
logging.info(f"{subj}_scanpath.mp4 saved!") |
|
break |
|
|
|
|
|
# Create and save scanpath image |
|
def save_scanpath_image(fixations, frame, subj, last_fix_id, path_to_scanpath): |
|
if fixations["n"]: |
|
if fixations["n"][-1] == last_fix_id: |
|
if not os.path.exists(os.path.join(path_to_scanpath,f"{subj}_scanpath.jpeg")): |
|
cv2.imwrite(f"{subj}_scanpath.jpeg", frame) |
|
logging.info(f"{subj}_scanpath.jpeg saved!") |
|
|
|
|
|
def store_fixations_for_all_subjects(fixations_df, fixations, subj): |
|
fixations_df[subj] = [] |
|
fixations_df[subj].append(fixations) |
|
|
|
|
|
def create_aggregated_scanpaths(fixations_df, colors, rim_data, names): |
|
# text aesthetics |
|
font = cv2.FONT_HERSHEY_DUPLEX |
|
font_size = 1 |
|
font_thick_w = 3 |
|
font_thick_b = 1 |
|
# load reference image |
|
ref_img = rim_data["reference_image.jpeg"][0] |
|
ref_img = cv2.cvtColor(ref_img, cv2.COLOR_BGR2RGB) |
|
|
|
for subj in names: |
|
overlay = ref_img.copy() |
|
ref_img = draw_semitransp_fill_fixation_circle( |
|
ref_img, fixations_df[subj][0], overlay, colors[subj], 0.5 |
|
) |
|
draw_outline_fixation_circle(ref_img, fixations_df[subj][0], colors[subj]) |
|
draw_scanpath_lines(ref_img, fixations_df[subj][0], colors[subj]) |
|
draw_all_names_legend( |
|
ref_img, colors, names, font, font_size, font_thick_b, font_thick_w |
|
) |
|
|
|
cv2.imwrite("general_scanpath.jpeg", ref_img) |
|
logging.info("general_scanpath.jpeg saved!") |
|
|
|
|
|
def remove_ref_video_folder(rim_folder): |
|
shutil.rmtree(path_to_ref_video(rim_folder)) |
|
|
|
|
|
def main(): |
|
rim_folder = select_folder() |
|
filepaths = paths_to_rim_files(rim_folder) |
|
scan_recording_id = check_sections_csv(filepaths) |
|
rim_data = reset_fixation_id(filepaths) |
|
set_name(rim_data, filepaths) |
|
add_fix_xyd(rim_data) |
|
scaler = CircleScaler( |
|
str(filepaths["reference_image.jpeg"]), |
|
rim_data, |
|
) |
|
scale_factor = scaler.run() |
|
scale_coordinates(rim_data, filepaths, scale_factor) |
|
fixations = rim_data["fixations.csv"][0] |
|
unique_names = fixations['names'].unique() |
|
names = [] |
|
|
|
# Check only recordings were fixations were detected in the reference image |
|
for name in unique_names: |
|
# Filter the DataFrame to include only rows with the current name |
|
name_df = fixations[fixations['names'] == name] |
|
|
|
# Check if 'fixation detected in reference image' has more than one unique value |
|
if name_df['fixation detected in reference image'].nunique() == 1: |
|
# Check if all rows are not equal to 'false' |
|
if not (name_df['fixation detected in reference image'] == False).all(): |
|
names.append(name) |
|
else: |
|
# Just append the name if there are two unique value (no need to check) |
|
names.append(name) |
|
|
|
|
|
ids = rim_data["fixations.csv"][0]["recording id"].unique() |
|
paired = list(zip(names, ids)) |
|
for pair in paired: |
|
if scan_recording_id in pair: |
|
names = names[names != pair[0]] |
|
colors = color_generator(names) |
|
path_to_scanpath = path_to_scanpath_video(rim_folder) |
|
fixations_df = {} |
|
for subj in names: |
|
create_ref_video(rim_data, rim_folder, subj) |
|
ref_video_path = os.path.join( |
|
path_to_ref_video(rim_folder), |
|
f"{subj}_ref.mp4", |
|
) |
|
scan_video_path = os.path.join( |
|
path_to_scanpath_video(rim_folder), |
|
f"{subj}_scanpath.mp4", |
|
) |
|
|
|
draw_on_frame( |
|
rim_data, |
|
ref_video_path, |
|
scan_video_path, |
|
subj, |
|
colors, |
|
fixations_df, |
|
path_to_scanpath, |
|
) |
|
if len(names) > 1: |
|
create_aggregated_scanpaths(fixations_df, colors, rim_data, names) |
|
remove_ref_video_folder(rim_folder) |
|
|
|
|
|
main() |