|
import player_methods as pm |
|
from blink_detection import Offline_Blink_Detection |
|
from pyglui import ui |
|
from bisect import bisect_left, bisect_right |
|
|
|
import csv |
|
import logging |
|
import os |
|
from collections import deque |
|
|
|
import numpy as np |
|
import OpenGL.GL as gl |
|
import pyglui.cygl.utils as cygl_utils |
|
from pyglui import ui |
|
from pyglui.pyfontstash import fontstash as fs |
|
from scipy.signal import fftconvolve |
|
|
|
import csv_utils |
|
import data_changed |
|
import file_methods as fm |
|
import gl_utils |
|
import player_methods as pm |
|
from observable import Observable |
|
from plugin import Plugin |
|
from pupil_recording import PupilRecording, RecordingInfo |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class Editable_Blink_Detection(Offline_Blink_Detection): |
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
self._last_frame_idx = 0 |
|
self._recent_blinks = [] |
|
self._should_export_lut = [] |
|
self._num_default_ui_elements = None |
|
|
|
@classmethod |
|
def parse_pretty_class_name(cls) -> str: |
|
return "Blink Detector (Editable)" |
|
|
|
def init_ui(self): |
|
super().init_ui() |
|
|
|
def jump_next_blink(_): |
|
cur_idx = self._last_frame_idx |
|
all_idc = [f["index"] for f in self.g_pool.blinks] |
|
if not all_idc: |
|
logger.warning("No blinks available") |
|
return |
|
# wrap-around index |
|
target_blink = bisect_right(all_idc, cur_idx) % len(all_idc) |
|
self.notify_all( |
|
{ |
|
"subject": "seek_control.should_seek", |
|
"index": int(self.g_pool.blinks[target_blink]["index"]), |
|
} |
|
) |
|
|
|
def jump_prev_blink(_): |
|
cur_idx = self._last_frame_idx |
|
all_idc = [f["index"] for f in self.g_pool.blinks] |
|
if not all_idc: |
|
logger.warning("No blinks available") |
|
return |
|
# wrap-around index |
|
target_blink = (bisect_left(all_idc, cur_idx) - 1) % len(all_idc) |
|
self.notify_all( |
|
{ |
|
"subject": "seek_control.should_seek", |
|
"index": int(self.g_pool.blinks[target_blink]["index"]), |
|
} |
|
) |
|
|
|
blink_thumb = ui.Thumb( |
|
"next_blink", |
|
getter=lambda: False, |
|
setter=jump_next_blink, |
|
label="b", |
|
hotkey="b", |
|
) |
|
blink_thumb.status_text = "Next Blink" |
|
self.g_pool.quickbar.append(blink_thumb) |
|
|
|
blink_thumb = ui.Thumb( |
|
"previous_blink", |
|
getter=lambda: False, |
|
setter=jump_prev_blink, |
|
label="B", |
|
hotkey="B", |
|
) |
|
blink_thumb.status_text = "Previous Blink" |
|
self.g_pool.quickbar.append(blink_thumb) |
|
|
|
self._num_default_ui_elements = len(self.menu) |
|
|
|
def recent_events(self, events): |
|
frame = events.get("frame") |
|
if not frame: |
|
return |
|
|
|
self._last_frame_idx = frame.index |
|
|
|
frame_window = pm.enclosing_window(self.g_pool.timestamps, frame.index) |
|
current_blinks = self.g_pool.blinks.by_ts_window(frame_window).tolist() |
|
|
|
if current_blinks == self._recent_blinks: |
|
return |
|
self._recent_blinks = current_blinks |
|
|
|
if self._num_default_ui_elements is None: |
|
return |
|
|
|
del self.menu[self._num_default_ui_elements :] |
|
for blink in current_blinks: |
|
self.menu.append( |
|
ui.Info_Text( |
|
f"Blink {blink['id']} of {len(self.g_pool.blinks)} | " |
|
f"Duration: {blink['duration']:.3f} seconds" |
|
) |
|
) |
|
|
|
def _getter(): |
|
try: |
|
return self._should_export_lut[blink["id"] - 1] |
|
except IndexError: |
|
logging.debug( |
|
f"Could not retrieve `should export` for {blink['id']} " |
|
f"(num lut entries: {len(self._should_export_lut)})" |
|
) |
|
return False |
|
|
|
def _setter(val: bool): |
|
try: |
|
self._should_export_lut[blink["id"] - 1] = val |
|
except IndexError: |
|
logging.debug( |
|
f"Could not set `should export` for {blink['id']} " |
|
f"(num lut entries: {len(self._should_export_lut)})" |
|
) |
|
|
|
self.menu.append(ui.Switch("Should export", getter=_getter, setter=_setter)) |
|
|
|
def consolidate_classifications(self): |
|
blink = None |
|
state = "no blink" # others: 'blink started' | 'blink ending' |
|
blink_data = deque() |
|
blink_start_ts = deque() |
|
blink_stop_ts = deque() |
|
counter = 1 |
|
|
|
# NOTE: Cache result for performance reasons |
|
pupil_data = self._pupil_data() |
|
|
|
def start_blink(idx): |
|
nonlocal blink |
|
nonlocal state |
|
nonlocal counter |
|
blink = { |
|
"topic": "blink", |
|
"__start_response_index__": idx, |
|
"start_timestamp": self.timestamps[idx], |
|
"id": counter, |
|
} |
|
state = "blink started" |
|
counter += 1 |
|
|
|
def blink_finished(idx): |
|
nonlocal blink |
|
|
|
# get tmp pupil idx |
|
start_idx = blink["__start_response_index__"] |
|
del blink["__start_response_index__"] |
|
|
|
blink["end_timestamp"] = self.timestamps[idx] |
|
blink["timestamp"] = (blink["end_timestamp"] + blink["start_timestamp"]) / 2 |
|
blink["duration"] = blink["end_timestamp"] - blink["start_timestamp"] |
|
blink["base_data"] = pupil_data[start_idx:idx].tolist() |
|
blink["filter_response"] = self.filter_response[start_idx:idx].tolist() |
|
# blink confidence is the mean of the absolute filter response |
|
# during the blink event, clamped at 1. |
|
blink["confidence"] = min( |
|
float(np.abs(blink["filter_response"]).mean()), 1.0 |
|
) |
|
|
|
# correlate world indices |
|
ts_start, ts_end = blink["start_timestamp"], blink["end_timestamp"] |
|
|
|
idx_start, idx_end = np.searchsorted( |
|
self.g_pool.timestamps, [ts_start, ts_end] |
|
) |
|
# fix `list index out of range` error |
|
idx_end = min(idx_end, len(self.g_pool.timestamps) - 1) |
|
blink["start_frame_index"] = int(idx_start) |
|
blink["end_frame_index"] = int(idx_end) |
|
blink["index"] = int((idx_start + idx_end) // 2) |
|
|
|
blink_data.append(fm.Serialized_Dict(python_dict=blink)) |
|
blink_start_ts.append(ts_start) |
|
blink_stop_ts.append(ts_end) |
|
|
|
for idx, classification in enumerate(self.response_classification): |
|
if state == "no blink" and classification > 0: |
|
start_blink(idx) |
|
elif state == "blink started" and classification == -1: |
|
state = "blink ending" |
|
elif state == "blink ending" and classification >= 0: |
|
blink_finished(idx - 1) # blink ended previously |
|
if classification > 0: |
|
start_blink(0) |
|
else: |
|
blink = None |
|
state = "no blink" |
|
|
|
if state == "blink ending": |
|
# only finish blink if it was already ending |
|
blink_finished(idx) # idx is the last possible idx |
|
|
|
self.g_pool.blinks = pm.Affiliator(blink_data, blink_start_ts, blink_stop_ts) |
|
self._should_export_lut = [True] * len(blink_data) |
|
self.notify_all({"subject": "blinks_changed", "delay": 0.2}) |
|
|
|
def recalculate(self): |
|
import time |
|
|
|
t0 = time.perf_counter() |
|
all_pp = self._pupil_data() |
|
if not all_pp: |
|
self.filter_response = [] |
|
self.response_classification = [] |
|
self.timestamps = [] |
|
self.consolidate_classifications() |
|
return |
|
|
|
self.timestamps = all_pp.timestamps |
|
total_time = self.timestamps[-1] - self.timestamps[0] |
|
|
|
conf_iter = (pp["confidence"] for pp in all_pp) |
|
activity = np.fromiter(conf_iter, dtype=float, count=len(all_pp)) |
|
total_time = all_pp[-1]["timestamp"] - all_pp[0]["timestamp"] |
|
filter_size = 2 * round(len(all_pp) * self.history_length / total_time / 2.0) |
|
blink_filter = np.ones(filter_size) / filter_size |
|
|
|
# This is different from the online filter. Convolution will flip |
|
# the filter and result in a reverse filter response. Therefore |
|
# we set the first half of the filter to -1 instead of the second |
|
# half such that we get the expected result. |
|
blink_filter[: filter_size // 2] *= -1 |
|
|
|
# The theoretical response maximum is +-0.5 |
|
# Response of +-0.45 seems sufficient for a confidence of 1. |
|
self.filter_response = fftconvolve(activity, blink_filter, "same") / 0.45 |
|
|
|
onsets = self.filter_response > self.onset_confidence_threshold |
|
offsets = self.filter_response < -self.offset_confidence_threshold |
|
|
|
self.response_classification = np.zeros(self.filter_response.shape) |
|
self.response_classification[onsets] = 1.0 |
|
self.response_classification[offsets] = -1.0 |
|
|
|
self.consolidate_classifications() |
|
|
|
tm1 = time.perf_counter() |
|
logger.debug( |
|
"Recalculating took\n\t{:.4f}sec for {} pp\n\t{} pp/sec\n\tsize: {}".format( |
|
tm1 - t0, len(all_pp), len(all_pp) / (tm1 - t0), filter_size |
|
) |
|
) |
|
|
|
def on_notify(self, notification): |
|
if notification["subject"] == "blink_detection.should_recalculate": |
|
self.recalculate() |
|
elif notification["subject"] == "blinks_changed": |
|
self.cache_activation() |
|
try: |
|
self.timeline.refresh() |
|
except AttributeError: |
|
pass |
|
elif notification["subject"] == "should_export": |
|
self.export(notification["ts_window"], notification["export_dir"]) |
|
|
|
def export(self, export_window, export_dir): |
|
""" |
|
Between in and out mark |
|
|
|
blink_detection_report.csv: |
|
- history lenght |
|
- onset threshold |
|
- offset threshold |
|
|
|
blinks.csv: |
|
id | start_timestamp | duration | end_timestamp | |
|
start_frame_index | index | end_frame_index | |
|
confidence | filter_response | base_data |
|
""" |
|
if not self.g_pool.blinks: |
|
logger.warning( |
|
"No blinks were detected in this recording. Nothing to export." |
|
) |
|
return |
|
|
|
header = ( |
|
"id", |
|
"start_timestamp", |
|
"duration", |
|
"end_timestamp", |
|
"start_frame_index", |
|
"index", |
|
"end_frame_index", |
|
"confidence", |
|
"filter_response", |
|
"base_data", |
|
) |
|
|
|
blinks_in_section = self.g_pool.blinks.by_ts_window(export_window) |
|
num_skipped = 0 |
|
|
|
with open( |
|
os.path.join(export_dir, "blinks.csv"), "w", encoding="utf-8", newline="" |
|
) as csvfile: |
|
csv_writer = csv.writer(csvfile) |
|
csv_writer.writerow(header) |
|
for b in blinks_in_section: |
|
if not self._should_export_lut[b["id"] - 1]: |
|
num_skipped += 1 |
|
continue |
|
csv_writer.writerow(self.csv_representation_for_blink(b, header)) |
|
logger.info( |
|
f"Created 'blinks.csv' file. Skipped {num_skipped} of " |
|
f"{len(blinks_in_section)} blinks." |
|
) |
|
|
|
with open( |
|
os.path.join(export_dir, "blink_detection_report.csv"), |
|
"w", |
|
encoding="utf-8", |
|
newline="", |
|
) as csvfile: |
|
csv_utils.write_key_value_file( |
|
csvfile, |
|
{ |
|
"history_length": self.history_length, |
|
"onset_confidence_threshold": self.onset_confidence_threshold, |
|
"offset_confidence_threshold": self.offset_confidence_threshold, |
|
"blinks_exported": len(blinks_in_section) - num_skipped, |
|
}, |
|
) |
|
logger.info("Created 'blink_detection_report.csv' file.") |