Skip to content

Instantly share code, notes, and snippets.

@inglesp
Created September 3, 2020 11:31
Show Gist options
  • Save inglesp/3d612e535e4772b2adef6f9aac822d12 to your computer and use it in GitHub Desktop.
Save inglesp/3d612e535e4772b2adef6f9aac822d12 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
'''
Created on Wed Jun 17 15:21:46 2020
@author: JTM
'''
from time import time
from threading import Thread
import numpy as np
import msgpack
import zmq
class PupilCore():
'''
A Class for Pupil Core and the remote helper.
'''
def __init__(self, address='127.0.0.1', request_port='50020'):
self.context = zmq.Context()
self.address = address
self.request_port = request_port
self.remote = zmq.Socket(self.context, zmq.REQ)
self.remote.connect('tcp://{}:{}'.format(self.address,
self.request_port))
# Request 'SUB_PORT' for reading data
self.remote.send_string('SUB_PORT')
self.sub_port = self.remote.recv_string()
# Request 'PUB_PORT' for writing data
self.remote.send_string('PUB_PORT')
self.pub_port = self.remote.recv_string()
# Open socket for publishing
self.pub_socket = zmq.Socket(self.context, zmq.PUB)
self.pub_socket.connect('tcp://{}:{}'.format(self.address, self.pub_port))
def command(self, cmd):
'''
Send a command via Pupil Remote.
Parameters
----------
cmd : string
Must be one of the following:
'R' - start recording with auto generated session name
'R rec_name' - start recording named "rec_name"
'r' - stop recording
'C' - start currently selected calibration
'c' - stop currently selected calibration
'T 1234.56' - resets current Pupil time to given timestamp
't' - get current Pupil time; returns a float as string
'v' - get the Pupil Core software version string
'PUB_PORT' - return the current pub port of the IPC Backbone
'SUB_PORT' - return the current sub port of the IPC Backbone
Returns
-------
string
the result of the command. If the command was not acceptable, this
will be 'Unknown command.'
'''
self.remote.send_string(cmd)
return self.remote.recv_string()
def notify(self, notification):
'''
Send a notification to Pupil Remote. Every notification has a topic
and can contain potential payload data. The payload data has to be
serializable, so not every Python object will work. To find out which
plugins send and receive notifications, open the codebase and search
for `.notify_all(` and `def on_notify(`.
Parameters
----------
pupil_remote : zmq.sugar.socket.Socket
the pupil remote helper.
notification : dict
the notification dict. Some examples:
- {'subject':'start_plugin', 'name':'Annotation_Capture', 'args':{}})
- {'subject':'recording.should_start', 'session_name':'my session'}
- {'subject':'recording.should_stop'}
Returns
-------
string
the response.
'''
topic = 'notify.' + notification['subject']
payload = msgpack.dumps(notification, use_bin_type=True)
self.remote.send_string(topic, flags=zmq.SNDMORE)
self.remote.send(payload)
return self.remote.recv_string()
def send_trigger(self, trigger):
'''
Send an annotation (a.k.a 'trigger') to Pupil Capture. Use to mark the
timing of events.
Parameters
----------
pub_socket : zmq.sugar.socket.Socket
a socket to publish the trigger.
trigger : dict
customiseable - see the new_trigger(...) function.
Returns
-------
None.
'''
payload = msgpack.dumps(trigger, use_bin_type=True)
self.pub_socket.send_string(trigger['topic'], flags=zmq.SNDMORE)
self.pub_socket.send(payload)
class LightStamper(Thread):
'''
A thread-bound class which uses the Pupil Core World Camera to detect the
onset of a light and send an annotation (a.k.a 'trigger') to Pupil Capture
with the associated Pupil timestamp. Useful for integrating with virtually
any light source. Supports extraction of PLRs and calcultion of time-critical
measures such as latency and time-to-peak consctriction. Inherits from
threading.Thread and overrides the .run() method. Instantiate a few seconds
before administering a light stimulus. Works very well with the following
settings in Pupil Capture:
1. Resolution (320, 240) for eye and world
2. Frame rate 120 for eye and world
3. Auto Exposure mode - Manual Exposure - eye and world
4. Absolute exposure time 60 for world, 63 for eye
5. Frame publisher format - BGR
Attributes
----------
detected : bool
Whether a light gets detected
timestamp : None, float
The pupil timestamp associated with the onset of a light
pupil : pupil.PupilCore
PupilCore class instance
trigger : dict
a dictionary with at least the following:
{'topic': 'annotation',
'label': 'our_label',
'timestamp': None}
timestamp will be overwritten with the new pupil timestamp for the
detected light. See new_trigger(...) for more info.
threshold : int
detection threshold for luminance increase. The right value depends on
the nature of the light stimulus and the ambient lighting conditions.
Requires some guesswork right now, but it would be good to have a
function that works it out for us.
wait_time : float, optional
time to wait in seconds before giving up (will run indefinitely if
no value is passed, in which case will require LightStamper.join()).
Use when controlling a light source programmatically. For STLAB, use
6.0 s, because on rare occasions it can take about 5 seconds
to process a request. The default in None.
Example
-------
label = 'LIGHT_ON'
trigger = new_trigger(label)
threshold = 15
wait_time = 5.
lst = LightStamper(pupil, trigger, threshold, wait_time)
lst.start()
'''
def __init__(self, pupil, trigger, threshold, wait_time=None):
'''
Parameters
----------
pupil : pupil.PupilCore
PupilCore class instance
trigger : dict
See new_triger(...)
threshold : int
Detection threshold for luminance increase.
wait_time : float, optional
Time to wait in seconds before giving up. The default is None.
Returns
-------
None.
'''
super(LightStamper, self).__init__()
self.pupil = pupil
self.trigger = trigger
self.threshold = threshold
self.wait_time = wait_time
self.successful = False
self.timestamp = None
# a unique, encapsulated subscription to frame.world
self.subscriber = self.pupil.context.socket(zmq.SUB)
self.subscriber.connect('tcp://{}:{}'.format(pupil.address, pupil.sub_port))
self.subscriber.setsockopt_string(zmq.SUBSCRIBE, 'frame.world')
# override threading.Thread.run() method with light detection code
def run(self):
recent_world = None
recent_world_minus_one = None
recent_world_ts = None
if self.wait_time is None:
self.wait_time, t1, t2 = 0, -1, -2 # dummy values
else:
t1 = time()
t2 = time()
print('Waiting for a light to stamp...')
while not self.successful and (t2 - t1) < self.wait_time:
topic, msg = recv_from_subscriber(self.subscriber)
if topic == 'frame.world':
recent_world = np.frombuffer(
msg['__raw_data__'][0], dtype=np.uint8).reshape(
msg['height'], msg['width'], 3)
recent_world_ts = msg['timestamp']
if recent_world is not None and recent_world_minus_one is not None:
diff = recent_world.mean() - recent_world_minus_one.mean()
if diff > self.threshold:
print('Light stamped at {}'.format(recent_world_ts))
self.trigger['timestamp'] = recent_world_ts # change trigger timestamp
self.pupil.send_trigger(self.trigger)
self.timestamp = recent_world_ts
self.successful = True
recent_world_minus_one = recent_world
if self.wait_time > 0:
t2 = time()
if self.successful == False:
print('LightStamper failed to detect a light...')
class PupilGrabber(Thread):
'''
A thread-bound class for grabbing data from Pupil Core.
'''
def __init__(self, pupil, topic, secs):
'''
Start grabbing some data from pupil.
Parameters
----------
pupil : pupil.PupilCore
PupilCore class instance
topic : str
Grab data with this topic
secs : TYPE
Ammount of time to spend grabbing data.
Returns
-------
None.
'''
super(PupilGrabber, self).__init__()
self.pupil = pupil
self.topic = topic
self.secs = secs
self.data = []
# a unique, encapsulated subscription to frame.world
self.subscriber = self.pupil.context.socket(zmq.SUB)
self.subscriber.connect(f'tcp://{self.pupil.address}:{self.pupil.sub_port}')
self.subscriber.subscribe(self.topic)
# override threading.Thread.run() method with code for grabbing data
def run(self):
print('PupilGrabber now grabbing {} seconds of {}'.format(
self.secs, self.topic))
t1, t2 = time(), time()
while t2 - t1 < self.secs:
topic, payload = self.subscriber.recv_multipart()
message = msgpack.loads(payload)
self.data.append(message)
t2 = time()
print('PupilGrabber done grabbing {} seconds of {}'.format(
self.secs, self.topic))
def get(self, what):
'''
Get grabbed data.
Parameters
----------
what : string
The data to get.
Returns
-------
np.array()
The requested data..
'''
return [entry[what.encode()] for entry in self.data]
def notify(pupil_remote, notification):
'''
Send a notification to Pupil Remote.
Parameters
----------
pupil_remote : zmq.sugar.socket.Socket
the pupil remote helper.
notification : dict
the notification dict.
e.g. {'subject':'start_plugin', 'name':'Annotation_Capture'}
Returns
-------
string
the response.
'''
topic = 'notify.' + notification['subject']
payload = msgpack.dumps(notification, use_bin_type=True)
pupil_remote.send_string(topic, flags=zmq.SNDMORE)
pupil_remote.send(payload)
return pupil_remote.recv_string()
def send_trigger(pub_socket, trigger):
'''
Send an annotation (a.k.a 'trigger') to Pupil Capture. Use to mark the
timing of events.
Parameters
----------
pub_socket : zmq.sugar.socket.Socket
a socket to publish the trigger.
trigger : dict
customiseable - see the new_trigger(...) function.
Returns
-------
None.
'''
payload = msgpack.dumps(trigger, use_bin_type=True)
pub_socket.send_string(trigger['topic'], flags=zmq.SNDMORE)
pub_socket.send(payload)
def new_trigger(label, custom_fields={}):
'''
Create a new trigger / annotation / message / event marker / whatever
you want to call it. Send it to Pupil Capture with the send_trigger(...)
function.
Parameters
----------
label : string
A label for the event.
custom_fields : dict, optional
Any additional information to add (e.g. {'duration':2, 'color':'blue'}).
The default is {}.
Returns
-------
trigger : dict
The trigger dictionary, ready to be sent.
'''
trigger = {
'topic' : 'annotation',
'label' : label,
'timestamp' : time()
}
for k, v in custom_fields.items():
trigger[k] = v
return trigger
def recv_from_subscriber(subscriber):
'''
Receive a message with topic and payload.
Parameters
----------
subscriber : zmq.sugar.socket.Socket
a subscriber to any valid topic.
Returns
-------
topic : str
A utf-8 encoded string, returned as a unicode object.
payload : dict
A msgpack serialized dictionary, returned as a python dictionary.
Any addional message frames will be added as a list in the payload
dictionary with key: '__raw_data__'. To use frame data, say:
np.frombuffer(msg['__raw_data__'][0], dtype=np.uint8).reshape(
msg['height'], msg['width'], 3)
'''
topic = subscriber.recv_string()
payload = msgpack.unpackb(subscriber.recv(), encoding='utf-8')
extra_frames = []
while subscriber.get(zmq.RCVMORE):
extra_frames.append(subscriber.recv())
if extra_frames:
payload['__raw_data__'] = extra_frames
return topic, payload
def detect_light_onset(subscriber,
pub_socket,
trigger,
threshold,
wait_time=None):
'''
Use the Pupil Core World Camera to detect the onset of a light and send
an annotation (a.k.a 'trigger') to Pupil Capture with the associated
Pupil timestamp. Useful for extracting PLRs and calculating time-critical
measures such as latency and time-to-peak consctriction. Start this function
before administering a light stimulus, and use a separate thread if controlling
the light programmatically from the same script. Tested with the following
settings in Pupil Capture:
1. Resolution (320, 240) for eye and world
2. Frame rate 120 for eye and world
3. Auto Exposure mode - Manual Exposure - eye and wold
4. Absolute exposure time 60 for world, 63 for eye
5. Frame publisher format - BGR
Parameters
----------
subscriber : zmq.sugar.socket.Socket
a socket subscribed to 'frame.world'
pub_socket : zmq.sugar.socket.Socket
a socket to publish the trigger using send_trigger(...)
trigger : dict
a dictionary with at least the following:
{'topic': 'annotation',
'label': 'our_label',
'timestamp': None}
timestamp will be overwritten with the new pupil timestamp for the
detected light. See new_trigger(...) for more info.
threshold : int
detection threshold for luminance increase. The right value depends on
the nature of the light stimulus and the ambient lighting conditions.
Requires some guesswork right now, but it would be good to have a
function that works it out for us.
wait_time : float, optional
time to wait in seconds before giving up (will run indefinitely if
no value is passed). Use when controlling a light source programmatically
/ running the function in its own thread. For STLAB, use 6.0 s,
because on rare occasions it can take about 5 seconds to process a
request. The default in None.
Returns
-------
None
'''
recent_world = None
recent_world_minus_one = None
recent_world_ts = None
detected = False
if wait_time is None:
wait_time, t1, t2 = 0, -1, -2 # dummy values
else:
t1 = time()
t2 = time()
print('Waiting for the light...')
while not detected and (t2 - t1) < wait_time:
topic, msg = recv_from_subscriber(subscriber)
if topic == 'frame.world':
recent_world = np.frombuffer(
msg['__raw_data__'][0], dtype=np.uint8).reshape(
msg['height'], msg['width'], 3)
recent_world_ts = msg['timestamp']
if recent_world is not None and recent_world_minus_one is not None:
diff = recent_world.mean() - recent_world_minus_one.mean()
if diff > threshold:
print('Light detected at {}'.format(recent_world_ts))
trigger['timestamp'] = recent_world_ts # change trigger timestamp
send_trigger(pub_socket, trigger)
detected = True
break # not sure if this is required
recent_world_minus_one = recent_world
if wait_time > 0:
t2 = time()
if detected == False:
print('Failed to detect a light.')
# def find_threshold(subscriber):
# world_data = []
# print('Shine a light...')
# while True:
# topic, msg = recv_from_subscriber(subscriber)
# recent_world = np.frombuffer(
# msg['__raw_data__'][0], dtype=np.uint8).reshape(
# msg['height'], msg['width'], 3)
# world_data.append(recent_world.mean())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment