Skip to content

Instantly share code, notes, and snippets.

@papr
Last active May 2, 2017 13:14
Show Gist options
  • Save papr/b517c31778af436fbb1aaa350a845dc0 to your computer and use it in GitHub Desktop.
Save papr/b517c31778af436fbb1aaa350a845dc0 to your computer and use it in GitHub Desktop.
import zmq
import msgpack as serializer
from time import sleep
# Connect to Pupil Capture
context = zmq.Context()
url = 'tcp://127.0.0.1:50020'
# Open a REQ socket. See http://zguide.zeromq.org/page:all#Ask-and-Ye-Shall-Receive
socket = zmq.Socket(context, zmq.REQ)
socket.connect('tcp://127.0.0.1:50020')
# send notification:
def notify(notification):
"""Sends ``notification`` to Pupil Remote"""
topic = 'notify.' + notification['subject']
payload = serializer.dumps(notification, use_bin_type=True)
socket.send_string(topic, flags=zmq.SNDMORE)
socket.send(payload)
return socket.recv_string()
# define utility function that sends trigger to Pupil Capture
def send_trigger(action, trial):
socket.send_string('t')
pupil_time = float(socket.recv_string()) # get current Pupil Capture time
# create notification dictionary
trigger = {'subject': 'annotation',
'label': 'gretas_trigger {} {}'.format(trial, action),
'timestamp': pupil_time,
'duration': 0.0,
'source': 'local',
'record': True}
notify(trigger)
# Start recording
socket.send_string('R')
print(socket.recv_string())
for trial in range(5):
# Start trial, send start trigger
send_trigger('start', trial)
sleep(5.) # do trial stuff here
# Stop trials, send stop trigger
send_trigger('stop', trial)
sleep(1.) # wait for a sec, not required
# Stop recording
socket.send_string('r')
print(socket.recv_string())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment