Skip to content

Instantly share code, notes, and snippets.

@papr
Last active December 10, 2021 17:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save papr/fcbafd5cf748c9b11e64a4dd37ec8e9a to your computer and use it in GitHub Desktop.
Save papr/fcbafd5cf748c9b11e64a4dd37ec8e9a to your computer and use it in GitHub Desktop.
Dual-Display HMD Calibration Choreography

Dual-Display HMD Calibration Choreography

The components below allow Pupil Core to calibrate gaze for separate head-mounted displays. As a result, Pupil Core can estimate gaze within custom target coordinate systems for each eye. This is different from the dual-monocular calibrations which calibrate each eye separately but assume the target coordinate system to be the same for both eyes.

There are three major components:

  • Pupil Core choreography plugin - responsible for collecting pupil and reference data, and initializing the gazer once the calibration chreography has been finished
  • Pupil Core gazer plugin - responsible for matching reference and pupil data and the actual calibration
  • Calibration client - Starts, stops, and provides reference data to the choreography plugin

Plugins

Both plugins, external_calibration_choreography_plugin.py and gazer_dual_display.py, need to be installed in the corresponding plugin folder. They are not listed in the UI until they are explicitly started via the client script.

Choreography

The choreography assumes to receive reference data via the ref_data field of the notify.calibration.add_ref_data notification.

Gazer

The dual-display gazer assumes the reference data to have the following format:

{
    "norm_pos": location_coord,  # normalized coordinates
    "timestamp": local_clock_function() + clock_offset,  # timestamp in Pupil time
    "eye_id": 1,  # left eye, set to 0 for right eye
}

The calibration will fail if the reference datum is missing any of the fields above.

The gazer will calibrate two monocular 2d models (via polynomial regression) based on the collected pupil and reference data. Pupil and reference data will only be matched if

  • they have matching eye ids
  • if their timestamps are not further apart than a 1/15 of a second

Normalized coordinate systems

The normalized coordinate system has its origin in the bottom left. The maximum height and width of the reference system correspond to the value 1.0.

Pixel coordinates can be transformed to the following formular:

x_norm = x_pixel / width_pixel
y_norm = 1.0 - (y_pixel / height_pixels)

And vice versa:

x_pixel = int(x_norm * width_pixel)
y_pixel = int((1.0 - y_norm) * height_pixels)

Client

The client (dual_display_choreography_client.py) performs the following steps:

  1. Connects to the Pupil Core application
  2. Subscribes to notify.calibration
  3. Calculates the clock offset between its local and the application's Pupil clock
  4. Enables the external calibration choreography plugin
  5. Selects the dual-display gazer
  6. Starts the calibration (and waits for a confirmed start)
  7. Performs the choreography
  8. Stops the calibration
  9. Waits for the calibration to either fail or succeed

Choreography

The client does not display any stimuli. Instead, it asks the subject to look at specific points in their field of view (e.g. top left) for a duration of one second.

This can be replaced by an implementation that shows a visual stimulus and uses its location as the reference location.

Dependencies

The Python dependencies can be found in the requirements-client.txt file and can be installed via the following command:

pip install -r requirements-client.txt
import logging
import time
import click
import msgpack
import zmq
logger = logging.getLogger(__name__)
@click.command()
@click.option("--ip", default="127.0.0.1", help="Pupil Capture or Service address")
@click.option("--port", default=50020, help="Pupil Remote port")
def main(ip, port):
# Connect to Pupil Remote
control_socket = pupil_remote_connection(ip, port)
sub_socket = get_calibration_notification_socket(control_socket, ip)
# Define local clock -
# Required to calculate accurate timestamps for the reference data
clock_function = time.monotonic
# Get offset between local clock and Pupil clock
clock_offset = _measure_clock_offset(control_socket, clock_function)
logger.info(f"Measured clock offset: {clock_offset} seconds")
# If the specific gazer class name is not found, Pupil Capture will fall back
# to the default gazer class (3D calibration)
gazer_class_name = "GazerDualDisplay2D"
logger.info(
"Enable ExternalCalibrationChoreography plugin with gazer class "
+ gazer_class_name
)
enable_external_choreography_plugin_selecting_gazer(
control_socket, gazer_class_name
)
# Wait for user confirmation
input("Press enter to start calibration")
# before starting the calibration, make sure that no older calibration notifications
# are queued in the socket. This is especially important if you want to repeat the
# calibration with the same subscription socket.
clear_socket_queue(sub_socket)
# Start calibration
control_socket.send_string("C") # upper case C
control_socket.recv_string() # required as part of the zmq REQ-REP protocol
wait_for_calibration_notification(sub_socket, "started")
perform_choreography(control_socket, clock_function, clock_offset)
# Stop calibration
control_socket.send_string("c") # lower case c
control_socket.recv_string() # required as part of the zmq REQ-REP protocol
feedback = wait_for_calibration_notification(sub_socket, "successful", "failed")
if feedback["subject"].endswith("failed"):
logger.error(f"Calibration failed: {feedback['reason']}")
raise SystemExit(1)
# Calibration successful.
# Create a new subscription socket, subscribe to `gaze`, and start processing
# the incoming data in real time.
def pupil_remote_connection(ip, port):
ctx = zmq.Context.instance()
request_socket = ctx.socket(zmq.REQ)
request_url = f"tcp://{ip}:{port}"
logger.info(f"Connecting to {request_url}")
request_socket.connect(request_url)
return request_socket
def get_calibration_notification_socket(control_socket, ip):
control_socket.send_string("SUB_PORT")
sub_port = control_socket.recv_string()
sub_url = f"tcp://{ip}:{sub_port}"
ctx = zmq.Context.instance()
sub_socket = ctx.socket(zmq.SUB)
logger.info(f"Subscribing to {sub_url}")
sub_socket.connect(sub_url)
sub_socket.subscribe("notify.calibration")
return sub_socket
def wait_for_calibration_notification(sub_socket, *topic_suffixes):
while True:
topic, notification = _recv_notification(sub_socket)
if any(topic.endswith(suffix) for suffix in topic_suffixes):
return notification
else:
logger.debug(f"Ignoring notification: {topic}")
def clear_socket_queue(socket):
"""Drop all messages in the socket's queue"""
while socket.get(zmq.EVENTS) & zmq.POLLIN:
try:
socket.recv(zmq.NOBLOCK)
except zmq.ZMQError:
break
def enable_external_choreography_plugin_selecting_gazer(
request_socket, gazer_class_name
):
"""Starts the calibration plugin"""
notification = {
"subject": "start_plugin",
"name": "ExternalCalibrationChoreography",
"args": {"selected_gazer_class_name": gazer_class_name},
}
_send_notification(request_socket, notification)
def perform_choreography(control_socket, clock_function, clock_offset):
# example locations in normalized coordinates
locations = (
([0.0, 1.0], "top left"),
([1.0, 1.0], "top right"),
([1.0, 0.0], "bottom right"),
([0.0, 0.0], "bottom left"),
([0.5, 0.5], "center"),
)
duration_per_location_seconds = 1.0
for location_coord, location_human in locations:
# To simulate a visual stimulus, the subject is instructed to look at a specific
# location within their field of view for a specific duration. Meanwhile, the
# code below generates reference locations at a specific sampling rate and sends
# them to the Pupil Core software.
# Another choreography client could display a visual stimulus, collect
# timestamps after each frame, and send the reference data in bulk afterward
_instruct_subject(location_human, duration_per_location_seconds)
ref_data = []
for _ in _timer(duration_per_location_seconds):
# Add reference data for both eyes. Coordinates can differ between the two.
ref_data.append(
{
"norm_pos": location_coord,
"timestamp": clock_function() + clock_offset,
"eye_id": 0, # right eye
}
)
ref_data.append(
{
"norm_pos": location_coord,
"timestamp": clock_function() + clock_offset,
"eye_id": 1, # left eye
}
)
_send_notification(
control_socket,
{"subject": "calibration.add_ref_data", "ref_data": ref_data},
)
def _instruct_subject(target_location_humand_description, duration_seconds):
input(
f"Look to the {target_location_humand_description}, hit enter, and keep looking"
f" at the target location for {duration_seconds} seconds"
)
def _timer(duration_seconds=1.0, sampling_rate_hz=30):
"""Returns control at a fixed rate for `duration_seconds`"""
num_samples = int(duration_seconds * sampling_rate_hz)
duration_between_samples = duration_seconds / sampling_rate_hz
for i in range(num_samples):
yield
time.sleep(duration_between_samples)
def _send_notification(request_socket, notification):
"""Sends ``notification`` to Pupil Remote"""
topic = "notify." + notification["subject"]
payload = msgpack.dumps(notification, use_bin_type=True)
request_socket.send_string(topic, flags=zmq.SNDMORE)
request_socket.send(payload)
return request_socket.recv_string()
def _recv_notification(sub_socket):
"""Receives a notification from Pupil Remote"""
topic = sub_socket.recv_string()
payload = sub_socket.recv()
notification = msgpack.unpackb(payload)
return topic, notification
def _measure_clock_offset(request_socket, clock_function):
"""Calculates the offset between the Pupil Core software clock and a local clock.
Requesting the remote pupil time takes time. This delay needs to be considered
when calculating the clock offset. We measure the local time before (A) and
after (B) the request and assume that the remote pupil time was measured at (A+B)/2,
i.e. the midpoint between A and B.
As a result, we have two measurements from two different clocks that were taken
assumingly at the same point in time. The difference between them ("clock offset")
allows us, given a new local clock measurement, to infer the corresponding time on
the remote clock.
See this helper for reference:
https://github.com/pupil-labs/pupil-helpers/blob/master/python/simple_realtime_time_sync.py
"""
local_time_before = clock_function()
request_socket.send_string("t")
pupil_time = float(request_socket.recv_string())
local_time_after = clock_function()
local_time = (local_time_before + local_time_after) / 2.0
clock_offset = pupil_time - local_time
return clock_offset
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
main()
from calibration_choreography.base_plugin import CalibrationChoreographyPlugin
class ExternalCalibrationChoreography(CalibrationChoreographyPlugin):
is_user_selectable = False
shows_action_buttons = False
is_session_persistent = False
label = "External Calibration"
def __init__(self, *args, **kwargs):
type(self).is_user_selectable = True
super().__init__(*args, **kwargs)
def cleanup(self):
type(self).is_user_selectable = False
super().cleanup()
@classmethod
def _choreography_description_text(cls) -> str:
return "Choreography that collects reference data from an external client."
def recent_events(self, events):
super().recent_events(events)
if self.is_active:
self.pupil_list.extend(events["pupil"])
def on_notify(self, note_dict):
if note_dict["subject"] == "calibration.add_ref_data":
self.ref_list += note_dict["ref_data"]
super().on_notify(note_dict)
import logging
from gaze_mapping import Gazer2D
from gaze_mapping.gazer_base import (
NotEnoughPupilDataError,
NotEnoughReferenceDataError,
NotEnoughDataError,
)
from gaze_mapping.gazer_2d import Model2D_Monocular
from gaze_mapping.utils import closest_matches_monocular_batch
logger = logging.getLogger(__name__)
class GazerDualDisplay2D(Gazer2D):
label = "Dual-Display 2D"
def init_matcher(self):
self.matcher = DummyMatcher()
def fit_on_calib_data(self, calib_data):
# extract reference data
ref_data = calib_data["ref_list"]
# extract and filter pupil data
pupil_data = calib_data["pupil_list"]
pupil_data = self.filter_pupil_data(
pupil_data, self.g_pool.min_calibration_confidence
)
if not pupil_data:
raise NotEnoughPupilDataError
if not ref_data:
raise NotEnoughReferenceDataError
# match pupil to reference data for each eye separately
matches_right = self.match_pupil_to_ref(pupil_data, ref_data, eye_id=0)
matches_left = self.match_pupil_to_ref(pupil_data, ref_data, eye_id=1)
if matches_right[0]:
self._fit_monocular_model(self.right_model, matches_right)
else:
logger.warning("Not enough matching data to fit right model")
if matches_left[0]:
self._fit_monocular_model(self.left_model, matches_left)
else:
logger.warning("Not enough matching data to fit left model")
if not self.right_model.is_fitted and not self.left_model.is_fitted:
raise NotEnoughDataError
def match_pupil_to_ref(self, pupil_data, ref_data, eye_id):
ref_data = [ref for ref in ref_data if ref["eye_id"] == eye_id]
pupil_data = [datum for datum in pupil_data if datum["id"] == eye_id]
return closest_matches_monocular_batch(ref_data, pupil_data)
# overwrite model init functions in case frame size is not available,
# e.g. external HMD use case
def _init_left_model(self):
return NoOutlierRemoval_Model2D_Monocular()
def _init_right_model(self):
return NoOutlierRemoval_Model2D_Monocular()
def _init_binocular_model(self):
"""Just used for code compatibility with base classes"""
return NoOutlierRemoval_Model2D_Monocular()
class NoOutlierRemoval_Model2D_Monocular(Model2D_Monocular):
def fit(self, X, Y):
assert X.shape[0] == Y.shape[0], "Required shape: (n_samples, n_features)"
self._validate_feature_dimensionality(X)
self._validate_reference_dimensionality(Y)
if X.shape[0] == 0:
raise NotEnoughDataError
polynomial_features = self._polynomial_features(X)
self._regressor.fit(polynomial_features, Y)
self._is_fitted = True
class DummyMatcher:
"""Dummy matcher that simply returns the input pupil datum.
Matching is only required if you want to build binocular pairs.
"""
def map_batch(self, pupil_list):
results = []
for p in pupil_list:
results.extend(self.on_pupil_datum(p))
return results
def on_pupil_datum(self, p):
yield [p]
click
msgpack>=1.*
pyzmq
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment