Skip to content

Instantly share code, notes, and snippets.

@marc-tonsen
Created January 8, 2021 12:53
Show Gist options
  • Save marc-tonsen/93b4b1ff0756a5a4be2acdfea1862cd2 to your computer and use it in GitHub Desktop.
Save marc-tonsen/93b4b1ff0756a5a4be2acdfea1862cd2 to your computer and use it in GitHub Desktop.
This gist demonstrates how to receive Pupil Invisible data over the local network in real-time for a simple visualization.
import time
import cv2
import numpy as np
# https://github.com/pupil-labs/pyndsi/tree/v1.0
import ndsi # Main requirement
SENSOR_TYPES = ["video", "gaze"]
SENSORS = {} # Will store connected sensors
def main():
# Start auto-discovery of Pupil Invisible Companion devices
network = ndsi.Network(
formats={ndsi.DataFormat.V4}, callbacks=(on_network_event,))
network.start()
try:
data_values = {
"PI world v1": np.zeros((1088, 1080, 3)),
"PI left v1": np.zeros((400, 400, 3)),
"PI right v1": np.zeros((400, 400, 3)),
"Gaze": (0, 0, 0)
}
# Event loop, runs until interrupted
while network.running:
# Check for recently connected/disconnected devices
if network.has_events:
network.handle_event()
# Iterate over all connected devices
for sensor in SENSORS.values():
if sensor.type not in SENSOR_TYPES:
continue
# Fetch recent sensor configuration changes,
# required for pyndsi internals
while sensor.has_notifications:
sensor.handle_notification()
# Fetch recent gaze data
for data in sensor.fetch_data():
if data is None:
continue
if sensor.type == "video":
data_values[sensor.name] = data.bgr
else:
data_values[sensor.name] = data
gaze_px = (int(data_values["Gaze"][0]),
int(data_values["Gaze"][1]))
cv2.circle(
data_values["PI world v1"],
gaze_px,
40, (0, 0, 255), 4
)
for key, value in data_values.items():
if key == "Gaze":
continue
cv2.imshow(key, value)
cv2.waitKey(1)
# Catch interruption and disconnect gracefully
except (KeyboardInterrupt, SystemExit):
network.stop()
def on_network_event(network, event):
# Handle gaze sensor attachment
if event["subject"] == "attach" and event["sensor_type"] in SENSOR_TYPES:
# Create new sensor, start data streaming,
# and request current configuration
sensor = network.sensor(event["sensor_uuid"])
sensor.set_control_value("streaming", True)
sensor.refresh_controls()
# Save sensor s.t. we can fetch data from it in main()
SENSORS[event["sensor_uuid"]] = sensor
print(f"Added sensor {sensor}...")
# Handle gaze sensor detachment
if event["subject"] == "detach" and event["sensor_uuid"] in SENSORS:
# Known sensor has disconnected, remove from list
SENSORS[event["sensor_uuid"]].unlink()
del SENSORS[event["sensor_uuid"]]
print(f"Removed sensor {event['sensor_uuid']}...")
main() # Execute example
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment