Created
June 23, 2020 15:13
-
-
Save papr/81163ada21e29469133bd5202de6893e to your computer and use it in GitHub Desktop.
Externalized examples to read Pupil Core recording data. Code taken from https://github.com/pupil-labs/pupil
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import collections | |
import msgpack | |
import numpy as np | |
PLData = collections.namedtuple("PLData", ["data", "timestamps", "topics"]) | |
def serialized_dict_from_msgpack_bytes(data): | |
return msgpack.unpackb( | |
data, raw=False, use_list=False, ext_hook=msgpack_unpacking_ext_hook, | |
) | |
def msgpack_unpacking_ext_hook(self, code, data): | |
SERIALIZED_DICT_MSGPACK_EXT_CODE = 13 | |
if code == SERIALIZED_DICT_MSGPACK_EXT_CODE: | |
return serialized_dict_from_msgpack_bytes(data) | |
return msgpack.ExtType(code, data) | |
def load_pldata_file(directory, topic): | |
ts_file = os.path.join(directory, topic + "_timestamps.npy") | |
msgpack_file = os.path.join(directory, topic + ".pldata") | |
try: | |
data = [] | |
topics = [] | |
data_ts = np.load(ts_file) | |
with open(msgpack_file, "rb") as fh: | |
for topic, payload in msgpack.Unpacker(fh, raw=False, use_list=False): | |
datum = serialized_dict_from_msgpack_bytes(payload) | |
data.append(datum) | |
topics.append(topic) | |
except FileNotFoundError: | |
data = [] | |
data_ts = [] | |
topics = [] | |
return PLData(data, data_ts, topics) | |
if __name__ == "__main__": | |
# edit `path` s.t. it points to your recording | |
path = "/Users/me/recordings/2020_06_19/001" | |
# Read "gaze.pldata" and "gaze_timestamps.npy" data | |
get_data = load_pldata_file(path, "gaze") | |
data_gaze = get_data.data | |
data_ts = get_data.timestamps | |
topics = get_data.topics | |
import pprint | |
p = pprint.PrettyPrinter(indent=4) | |
print(">>> FIRST GAZE TIMESTAMP:") | |
p.pprint(data_ts[0]) | |
print() | |
print(">>> FIRST GAZE DATUM:") | |
p.pprint(data_gaze[0]) | |
print() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import csv | |
import json | |
def read_key_value_file(csvfile): | |
kvstore = {} # init key value store | |
first_line = csvfile.readline() | |
if "key" not in first_line or "value" not in first_line: | |
csvfile.seek(0) # Seek to start if first_line is not an header | |
dialect = csv.Sniffer().sniff(first_line, delimiters=",\t") | |
reader = csv.reader( | |
csvfile, dialect, quoting=csv.QUOTE_NONE, escapechar="\\" | |
) # create reader | |
for row in reader: | |
kvstore[row[0]] = row[1] | |
return kvstore | |
def get_start_time_synced_s(rec_dir): | |
info_json_path = os.path.join(rec_dir, "info.player.json") | |
info_csv_path = os.path.join(rec_dir, "info.csv") | |
if os.path.exists(info_json_path): | |
with open(info_json_path, "r") as file: | |
info_json = json.load(file) | |
return float(info_json["start_time_synced_s"]) | |
elif os.path.exists(info_csv_path): | |
with open(info_csv_path, "r") as file: | |
info_csv = read_key_value_file(file) | |
return float(info_csv["Start Time (Synced)"]) | |
raise ValueError("Couldn't find neither info.player.json not info.csv") | |
if __name__ == "__main__": | |
# edit `path` s.t. it points to your recording | |
path = "/Users/me/recordings/2020_06_19/001" | |
# Read recording start time in seconds | |
rec_start_time_s = get_start_time_synced_s(path) | |
print(">>> RECORDING START TIME:") | |
print(rec_start_time_s) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment