Create a gist now

Instantly share code, notes, and snippets.

@willpatera /Pupil_tools.md Secret
Last active Feb 22, 2017

What would you like to do?
Collection of convenience functions and stand-alone scripts for Pupil.
"""
Make square markers is a helper script that is used to generate 5x5 square marker png files that can be printed.
"""
import cv2
import numpy as np
def encode_marker(mId):
marker_id_str = "%02d"%mId
# marker is based on grid of black (0) /white (1) pixels
# b|b|b|b|b
# b|o|m|o|b b = black border feature
# b|m|m|m|b o = orientation feature
# b|o|m|o|b m = message feature
# b|b|b|b|b
grid = 5
m_with_b = np.zeros((grid,grid),dtype=np.uint8)
m = m_with_b[1:-1,1:-1]
#bitdepth = grid-border squared - 3 for orientation (orientation still yields one bit)
bitdepth = ((5-2)**2)-3
if mId>=(2**bitdepth):
raise Exception("ERROR: ID overflow, this marker can only hold %i bits of information" %bitdepth)
msg = [0]*bitdepth
for i in range(len(msg))[::-1]:
msg[i] = mId%2
mId = mId >>1
# out first bit is encoded in the orientation corners of the marker:
# MSB = 0 MSB = 1
# W|*|*|W ^ B|*|*|B ^
# *|*|*|* / \ *|*|*|* / \
# *|*|*|* | UP *|*|*|* | UP
# B|*|*|W | W|*|*|B |
msb = msg.pop(0)
if msb:
orientation = 0,1,0,0
else:
orientation = 1,0,1,1
m[0,0], m[-1,0], m[-1,-1], m[0,-1] = orientation
msg_mask = np.ones(m.shape,dtype=np.bool)
msg_mask[0,0], msg_mask[-1,0], msg_mask[-1,-1], msg_mask[0,-1] = 0,0,0,0
m[msg_mask] = msg[::-1]
# print "Marker: \n", m_with_b
return m_with_b
def write_marker_png(mId,size):
marker_id_str = "%02d"%mId
m = encode_marker(mId)*255
m = cv2.resize(m,(size,size),interpolation=cv2.INTER_NEAREST)
m = cv2.cvtColor(m,cv2.COLOR_GRAY2BGR)
cv2.imwrite('marker '+marker_id_str+'.png',m)
def write_all_markers_png(size):
rows,cols,m_size = 8,8,7
canvas = np.ones((m_size*rows,m_size*cols),dtype=np.uint8)
for mid in range(64):
marker_with_padding = np.ones((7,7))
marker_with_padding[1:-1,1:-1] = encode_marker(mid)
m_size = marker_with_padding.shape[0]
r = (mid%rows) * m_size
c = (mid/cols) * m_size
canvas[r:r+m_size,c:c+m_size] = marker_with_padding*255
canvas = cv2.resize(canvas,(size,size),interpolation=cv2.INTER_NEAREST)
canvas = cv2.cvtColor(canvas,cv2.COLOR_GRAY2BGR)
cv2.imwrite('all_markers.png',canvas)
if __name__ == '__main__':
write_all_markers_png(2000)
write_marker_png(8,800)
import numpy as np
import cv2
import cPickle as pickle
import os
import csv
"""
Simple utility script that calculates the positions of markers from the surface metrics reports
using the `m_to_screen` transformation matrix and writes to a csv file.
"""
def denormalize(pos, (width, height), flip_y=False):
"""
denormalize
"""
x = pos[0]
y = pos[1]
x *= width
if flip_y:
y = 1-y
y *= height
return x,y
def load_object(file_path):
file_path = os.path.expanduser(file_path)
with open(file_path,'rb') as fh:
return pickle.load(fh)
def ref_surface_to_img(pos,m_to_screen):
shape = pos.shape
pos.shape = (-1,1,2)
new_pos = cv2.perspectiveTransform(pos,m_to_screen )
new_pos.shape = shape
return new_pos
def get_marker_positions_pixels(surf_data_file):
corners = [[0,0],[0,1],[1,1],[1,0]]
data = []
for d,i in zip(srf_data_file,range(len(srf_data_file))):
if d is not None:
data.append([i,[denormalize(ref_surface_to_img(np.array(c,dtype=np.float32),d['m_to_screen']),(1280,720)) for c in corners]])
else:
data.append([i,None])
return data
def write_csv(file_path, data, csv_file_name="marker_positions_pixels.csv"):
with open(os.path.join(os.path.expanduser(file_path),csv_file_name),'wb') as csvfile:
csv_writer = csv.writer(csvfile, delimiter='\t',quotechar='|', quoting=csv.QUOTE_MINIMAL)
csv_writer.writerow(('frame_number', 'marker_positions_in_pixels'))
for i in data:
csv_writer.writerow((i[0],i[1]))
if __name__ == '__main__':
# change the below variables for your recording directory
data_dir = "~/Desktop/Sandbox/Marker_Demo_Unboxing"
metrics_dir = "metrics_0-5993"
metrics_file = "srf_positions_box_front"
srf_data_file = load_object(os.path.join(data_dir,metrics_dir,metrics_file))
write_csv(os.path.join(data_dir,metrics_dir),get_marker_positions_pixels(srf_data_file))
"""
Broadcast dummy Pupil stream over TCP
used for debugging and
"""
import zmq
from ctypes import create_string_buffer
from time import sleep
import random
def test_msg():
dummy_data = str((random.random(),random.random()))
test_msg = "Pupil\nconfidence:0.695435635564\nnorm_gaze:"+dummy_data+"\n"+\
"apparent_pupil_size:41.609413147\nnorm_pupil:(0.76884605884552, 0.35504735310872393)\ntimestamp:1389761135.56\n"
return test_msg
def main():
port = "5000"
context = zmq.Context()
socket = context.socket(zmq.PUB)
address = create_string_buffer("tcp://127.0.0.1:"+port,512)
try:
socket.bind(address.value)
except zmq.ZMQError:
print "Could not set Socket."
for i in range(30):
# send 30 samples and sleep for half a second
socket.send( test_msg() )
sleep(0.5)
context.destroy()
if __name__ == '__main__':
main()
"""
Receive data from Pupil server broadcast over TCP
test script to see what the stream looks like
and for debugging
"""
import zmq
#network setup
port = "5000"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:"+port)
#filter by messages by stating string 'STRING'. '' receives all messages
socket.setsockopt(zmq.SUBSCRIBE, '')
while True:
msg = socket.recv()
print "raw msg:\n", msg
items = msg.split("\n")
msg_type = items.pop(0)
items = dict([i.split(':') for i in items[:-1] ])
if msg_type == 'Pupil':
try:
print "norm_gaze: ", items['norm_gaze']
except KeyError:
pass
else:
# process non gaze position events from plugins here
pass

Pupil Tools

This is a small collection of tools that we use with Pupil. The tools are not part of the main repository, as they are mostly stand-alone convenience scripts.

"""
Receive data from Pupil server broadcast over TCP
test script to see what the stream looks like
and for debugging
"""
import zmq
#network setup
port = "5000"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:"+port)
#filter by messages by stating string 'STRING'. '' receives all messages
socket.setsockopt(zmq.SUBSCRIBE, '')
while True:
msg = socket.recv()
items = msg.split("\n")
msg_type = items.pop(0)
items = dict([i.split(':') for i in items[:-1] ])
if msg_type == 'Pupil':
try:
print "norm_gaze:\t%s\napparent_pupil_size:\t%s" %(items['norm_gaze'], items['apparent_pupil_size'])
except KeyError:
pass
else:
# process non gaze position events from plugins here
pass
"""
Stream Pupil gaze coordinate data using zmq to control a mouse with your eye.
Please note that marker tracking must be enabled, and in this example we have named the surface "screen."
You can name the surface what you like in Pupil capture and then write the name of the surface you'd like to use on line 17.
"""
import zmq
from pymouse import PyMouse
#mouse setup
m = PyMouse()
x_dim, y_dim = m.screen_size()
#network setup
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:5000")
#filter by messages by stating string 'STRING'. '' receives all messages
socket.setsockopt(zmq.SUBSCRIBE, '')
smooth_x, smooth_y= 0.5, 0.5
surface_name = "screen"
while True:
msg = socket.recv()
items = msg.split("\n")
msg_type = items.pop(0)
items = dict([i.split(':') for i in items[:-1] ])
if msg_type == 'Pupil':
try:
gaze_on_screen = items["realtime gaze on "+surface_name]
raw_x,raw_y = map(float,gaze_on_screen[1:-1].split(','))
# smoothing out the gaze so the mouse has smoother movement
smooth_x += 0.5 * (raw_x-smooth_x)
smooth_y += 0.5 * (raw_y-smooth_y)
x = smooth_x
y = smooth_y
y = 1-y # inverting y so it shows up correctly on screen
x *= x_dim
y *= y_dim
# PyMouse or MacOS bugfix - can not go to extreme corners because of hot corners?
x = min(x_dim-10, max(10,x))
y = min(y_dim-10, max(10,y))
m.move(x,y)
except KeyError:
pass
else:
# process non gaze position events from plugins here
pass
"""
Receive data from Pupil server broadcast over TCP
test script to see what the stream looks like
and for debugging
"""
import zmq
#network setup
port = "5000"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:"+port)
#filter by messages by stating string 'STRING'. '' receives all messages
socket.setsockopt(zmq.SUBSCRIBE, '')
surface_name = 'spread'
while True:
msg = socket.recv()
items = msg.split("\n")
msg_type = items.pop(0)
items = dict([i.split(':') for i in items[:-1] ])
if msg_type == 'Pupil':
try:
gp = items['realtime gaze on '+surface_name]
gp_x, gp_y = map(float,gp[1:-1].split(','))
if (0<= gp_x <=1 and 0<= gp_y <=1):
print "gaze on surface: %s\t - normalized coords: %s, %s" %(surface_name, gp_x, gp_y)
except KeyError:
pass
else:
# process non gaze position events from plugins here
pass

I updated the scripts to work with Pupil v0.4.0.
In case anyone needs it (there's almost no change): https://gist.github.com/toshikurauchi/2beb05eb589504ce9f1f

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment