Created
March 12, 2018 15:13
-
-
Save pietromarchesi/a7f041ba27d2ee55c48cb4bca244c6ec to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import neo | |
import numpy as np | |
from quantities import s | |
import matplotlib.pyplot as plt | |
import p_utils | |
### EXTRACTING AND PREPARING THE DATA ### | |
# TODO: add animal information | |
ts_folder = '/home/pietro/data/touch&see/' | |
sess_num = 3 | |
F = p_utils.io.loadmat(ts_folder+'/F/F_units/F_All.mat')['F'] | |
rat, day, session = F[sess_num].dir.split('/')[-3:] | |
session_path = ts_folder+'SpikeTrl/'+rat+'/'+day+'/'+session+'/spikeTrl.mat' | |
binned_data_path = 'PositionInfo_30/'+rat+'/'+day+'/'+session+'/binnedData.mat' | |
binned_movement_path = 'PositionInfo_30/'+rat+'/'+day+'/'+session+'/binnedMovementDirection.mat' | |
S = p_utils.io.loadmat(session_path) | |
B = p_utils.io.loadmat(ts_folder+binned_data_path) | |
Bm = p_utils.io.loadmat(ts_folder+binned_movement_path) | |
U = p_utils.io.loadmat(ts_folder + 'unit.mat') | |
first_ts = S['spike']['hdr'][sess_num].FirstTimeStamp | |
spike_ts = S['spike']['timestamp'] | |
last_spike_ts = np.array([sp.max() for sp in spike_ts]) | |
last_spike_s = (last_spike_ts.max() - first_ts)/ 1e6 | |
# trial information | |
trl = S['spikeTrl_imgon']['cfg']['trl'] | |
trial_img_on = (trl[:, 0] - first_ts) / 1e6 + 10 | |
# select only the trials for which the time the animal took between trial start | |
# and point of no return is below 15 seconds | |
trials_ok = trl[:,-1] /1e6 < 15 | |
trial_outcome = trl[:,4] - 1 | |
trial_side = trl[:,5] - 1 | |
trial_type = trl[:,6] - 1 | |
n_trials = trial_img_on.shape[0] | |
# number definitions of the events | |
# S['event']['eventdef'] | |
# sequence of events | |
# S['event']['events'] | |
# times associated with events in events events: | |
eventtimes = (S['event']['ts'] - first_ts) / 1e6 | |
img_on_original = eventtimes[S['event']['events'] == 10] | |
frame_times = B['placedataNanJump']['time'] | |
# unit-level information | |
unit_index = U['unit']['sess'] == sess_num+1 | |
unit_area = U['unit']['area'][unit_index] | |
unit_depth = U['unit']['unitDepth'][unit_index] | |
unit_layer = U['unit']['unitLayer'][unit_index] | |
unit_rat = U['unit']['ratName'][unit_index] | |
unit_sess = U['unit']['sess'][unit_index] | |
unit_sess_num = U['unit']['unitSessNum'][unit_index] | |
unit_label = U['unit']['label'][unit_index] | |
n_units = unit_area.shape[0] | |
# extract the tetrode and channel number from the label | |
unit_tetrode = np.zeros_like(unit_label) | |
for i in range(unit_tetrode.shape[0]): | |
try: | |
tet_num = int(unit_label[i][-3:-2]) | |
except ValueError: | |
tet_num = int(unit_label[i][-4:-3]) | |
unit_tetrode[i] = tet_num | |
unit_channel = np.zeros_like(unit_label) | |
for i in range(unit_channel.shape[0]): | |
chan_num = unit_label[i][-7:-6] | |
if not chan_num in ['A', 'B', 'C', 'D']: | |
chan_num = unit_label[i][-8:-7] | |
if not chan_num in ['A', 'B', 'C', 'D']: | |
raise ValueError('Could not extract unit channel.') | |
x_pos = B['frameBin'][:,0] #x y at every frame | |
y_pos = B['frameBin'][:,1] | |
# run some checks: | |
# preprocessed img_on times are equal to the ones extracted from spikeTrl.event | |
np.testing.assert_array_equal(trial_img_on, img_on_original) | |
# same number of units and spike timestamp vectors | |
np.testing.assert_equal(unit_area.shape, spike_ts.shape) | |
### SETTING UP THE NEO DATASET ### | |
# CREATE EMPTY BLOCK | |
bl = neo.Block(name = 'session %s' %(sess_num+1)) | |
# EXPLAIN THE CODES USED IN THE ANNOTATIONS | |
bl.annotations['trial_outcome'] = '0=bad trial, 1=good trial' | |
bl.annotations['trial_side'] = '0=left, 1=right' | |
bl.annotations['trial_type'] = '0=normal trial, 1=memory trial (image goes' \ | |
'off before movement starts)' | |
# CREATE CHANNELS | |
Hp = neo.ChannelIndex(name='Hippocampus', index=None) | |
Pr = neo.ChannelIndex(name='Perirhinal', index=None) | |
Br = neo.ChannelIndex(name='Barrel', index=None) | |
V1 = neo.ChannelIndex(name='V1', index=None) | |
bl.channel_indexes = [Hp, Pr, Br, V1] #, Pr35, Pr36, Pr3536] | |
# the index in the ChannelIndex is the index of the units belonging to the | |
# channel in the analog signal, so that you can use it to slice it and recover | |
# CREATE UNITS | |
for u in range(n_units): | |
if unit_area[u] != 'tea': | |
np.testing.assert_equal(unit_rat[u], rat) | |
np.testing.assert_equal(unit_sess[u], sess_num+1) | |
np.testing.assert_equal(unit_sess_num[u], u+1) | |
unit = neo.Unit(name='unit '+str(u), | |
area=None, | |
depth=unit_depth[u], | |
layer=unit_layer[u], | |
sess_ind=u, | |
tetrode=unit_tetrode[u], | |
channel=unit_channel[u], | |
general_ind=np.where(unit_index)[0][u]) | |
# ASSIGN UNITS TO AREAS | |
if unit_area[u] == 'Br': | |
unit.annotations['area'] = 'Barrel' | |
Br.units.append(unit) | |
unit.channel_index = Br | |
if unit_area[u] == 'Hp': | |
unit.annotations['area'] = 'Hippocampus' | |
Hp.units.append(unit) | |
unit.channel_index = Hp | |
# perirhinal units from different sub-areas are grouped together, | |
# information about the sub-areas is moved to the annotations. | |
if unit_area[u] == '35': | |
unit.annotations['area'] = 'Perirhinal - Area 35' | |
Pr.units.append(unit) | |
unit.channel_index = Hp | |
if unit_area[u] == '36': | |
unit.annotations['area'] = 'Perirhinal - Area 36' | |
Pr.units.append(unit) | |
unit.channel_index = Pr | |
if unit_area[u] == 'b3536': | |
unit.annotations['area'] = 'Perirhinal - Area 35-36' | |
Pr.units.append(unit) | |
unit.channel_index = Pr | |
if unit_area[u] == 'V': | |
unit.annotations['area'] = 'V1' | |
V1.units.append(unit) | |
unit.channel_index = V1 | |
for t in range(n_trials): | |
if trials_ok[t]: | |
# CREATE THE SEGMENT (TRIAL) | |
seg = neo.Segment(name='Trial %s' %(t+1), | |
index=t+1, | |
trial_outcome=trial_outcome[t], | |
trial_side=trial_side[t], | |
trial_type=trial_type[t]) | |
# ADD EVENTS TO THE TRIAL (and potentially epochs) | |
# this dummy event is added due to a bug https://github.com/NeuralEnsemble/python-neo/issues/336 | |
ev = neo.Event(np.array([trial_img_on[t], trial_img_on[t]+1]) * s, | |
labels=np.array(['Image on', 'Dummy event']), dtype='S') | |
seg.events.append(ev) | |
# ADD SEGMENT TO THE BLOCK | |
bl.segments.append(seg) | |
for un in bl.list_units: | |
# extract the spike train for the specific unit | |
un_ind = un.annotations['sess_ind'] | |
spike_train = (spike_ts[un_ind] - first_ts) / 1e6 | |
# beginning and end of the trial | |
if not t == n_trials-1: | |
t_start, t_stop = trial_img_on[t], trial_img_on[t + 1] | |
else: | |
t_start, t_stop = trial_img_on[t], last_spike_s | |
# get the spike times within the trial | |
# train_trial is the spike train of unit `un` during trial `t` | |
train_trial = spike_train[(spike_train>=t_start)&(spike_train<t_stop)].copy() | |
# GENERATE SPIKETRAIN OBJECT | |
train = neo.SpikeTrain(train_trial*s, | |
t_start=t_start*s, | |
t_stop=t_stop*s) | |
# ASSIGN THE SPIKE TRAIN TO THE TRIAL (SEGMENT) | |
seg.spiketrains.append(train) | |
# ASSIGN THE SPIKE TRAIN TO THE UNIT | |
un.spiketrains.append(train) | |
# ASSIGN THE SEGMENT AND UNIT TO THE SPIKE TRAIN | |
train.segment = seg | |
train.unit = un | |
### EXAMPLES ### | |
# | |
# # Get all the Hippocampal spike trains | |
# Hp_trains = [unit.spiketrains for unit in bl.channel_indexes[0].units] | |
# | |
# # Get all the Hippocampal spike trains of trial 12 | |
# # First, find trial 12 using the index attribute | |
# trial12 = [seg for seg in bl.segments if seg.index == 12][0] | |
# # then get all the spike trains from that segments whose units belong to | |
# # the channel with index 0 | |
# trains = [tr for tr in trial12.spiketrains if tr.unit.channel_index.index == 0] | |
# ### SAVE THE SESSION ### | |
# from neo.io.neomatlabio import NeoMatlabIO | |
# filename = 'TZ_sess_'+str(sess_num)+'.mat' | |
# w = NeoMatlabIO(filename=filename) | |
# w.write_block(bl) | |
# | |
### READ THE SESSION ### | |
#r = neo.io.NeoMatlabIO(filename=filename) | |
#bl = r.read_block() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment