Skip to content

Instantly share code, notes, and snippets.

@pietromarchesi
Created March 12, 2018 15:13
Show Gist options
  • Save pietromarchesi/a7f041ba27d2ee55c48cb4bca244c6ec to your computer and use it in GitHub Desktop.
Save pietromarchesi/a7f041ba27d2ee55c48cb4bca244c6ec to your computer and use it in GitHub Desktop.
import os
import neo
import numpy as np
from quantities import s
import matplotlib.pyplot as plt
import p_utils
### EXTRACTING AND PREPARING THE DATA ###
# TODO: add animal information
ts_folder = '/home/pietro/data/touch&see/'
sess_num = 3
F = p_utils.io.loadmat(ts_folder+'/F/F_units/F_All.mat')['F']
rat, day, session = F[sess_num].dir.split('/')[-3:]
session_path = ts_folder+'SpikeTrl/'+rat+'/'+day+'/'+session+'/spikeTrl.mat'
binned_data_path = 'PositionInfo_30/'+rat+'/'+day+'/'+session+'/binnedData.mat'
binned_movement_path = 'PositionInfo_30/'+rat+'/'+day+'/'+session+'/binnedMovementDirection.mat'
S = p_utils.io.loadmat(session_path)
B = p_utils.io.loadmat(ts_folder+binned_data_path)
Bm = p_utils.io.loadmat(ts_folder+binned_movement_path)
U = p_utils.io.loadmat(ts_folder + 'unit.mat')
first_ts = S['spike']['hdr'][sess_num].FirstTimeStamp
spike_ts = S['spike']['timestamp']
last_spike_ts = np.array([sp.max() for sp in spike_ts])
last_spike_s = (last_spike_ts.max() - first_ts)/ 1e6
# trial information
trl = S['spikeTrl_imgon']['cfg']['trl']
trial_img_on = (trl[:, 0] - first_ts) / 1e6 + 10
# select only the trials for which the time the animal took between trial start
# and point of no return is below 15 seconds
trials_ok = trl[:,-1] /1e6 < 15
trial_outcome = trl[:,4] - 1
trial_side = trl[:,5] - 1
trial_type = trl[:,6] - 1
n_trials = trial_img_on.shape[0]
# number definitions of the events
# S['event']['eventdef']
# sequence of events
# S['event']['events']
# times associated with events in events events:
eventtimes = (S['event']['ts'] - first_ts) / 1e6
img_on_original = eventtimes[S['event']['events'] == 10]
frame_times = B['placedataNanJump']['time']
# unit-level information
unit_index = U['unit']['sess'] == sess_num+1
unit_area = U['unit']['area'][unit_index]
unit_depth = U['unit']['unitDepth'][unit_index]
unit_layer = U['unit']['unitLayer'][unit_index]
unit_rat = U['unit']['ratName'][unit_index]
unit_sess = U['unit']['sess'][unit_index]
unit_sess_num = U['unit']['unitSessNum'][unit_index]
unit_label = U['unit']['label'][unit_index]
n_units = unit_area.shape[0]
# extract the tetrode and channel number from the label
unit_tetrode = np.zeros_like(unit_label)
for i in range(unit_tetrode.shape[0]):
try:
tet_num = int(unit_label[i][-3:-2])
except ValueError:
tet_num = int(unit_label[i][-4:-3])
unit_tetrode[i] = tet_num
unit_channel = np.zeros_like(unit_label)
for i in range(unit_channel.shape[0]):
chan_num = unit_label[i][-7:-6]
if not chan_num in ['A', 'B', 'C', 'D']:
chan_num = unit_label[i][-8:-7]
if not chan_num in ['A', 'B', 'C', 'D']:
raise ValueError('Could not extract unit channel.')
x_pos = B['frameBin'][:,0] #x y at every frame
y_pos = B['frameBin'][:,1]
# run some checks:
# preprocessed img_on times are equal to the ones extracted from spikeTrl.event
np.testing.assert_array_equal(trial_img_on, img_on_original)
# same number of units and spike timestamp vectors
np.testing.assert_equal(unit_area.shape, spike_ts.shape)
### SETTING UP THE NEO DATASET ###
# CREATE EMPTY BLOCK
bl = neo.Block(name = 'session %s' %(sess_num+1))
# EXPLAIN THE CODES USED IN THE ANNOTATIONS
bl.annotations['trial_outcome'] = '0=bad trial, 1=good trial'
bl.annotations['trial_side'] = '0=left, 1=right'
bl.annotations['trial_type'] = '0=normal trial, 1=memory trial (image goes' \
'off before movement starts)'
# CREATE CHANNELS
Hp = neo.ChannelIndex(name='Hippocampus', index=None)
Pr = neo.ChannelIndex(name='Perirhinal', index=None)
Br = neo.ChannelIndex(name='Barrel', index=None)
V1 = neo.ChannelIndex(name='V1', index=None)
bl.channel_indexes = [Hp, Pr, Br, V1] #, Pr35, Pr36, Pr3536]
# the index in the ChannelIndex is the index of the units belonging to the
# channel in the analog signal, so that you can use it to slice it and recover
# CREATE UNITS
for u in range(n_units):
if unit_area[u] != 'tea':
np.testing.assert_equal(unit_rat[u], rat)
np.testing.assert_equal(unit_sess[u], sess_num+1)
np.testing.assert_equal(unit_sess_num[u], u+1)
unit = neo.Unit(name='unit '+str(u),
area=None,
depth=unit_depth[u],
layer=unit_layer[u],
sess_ind=u,
tetrode=unit_tetrode[u],
channel=unit_channel[u],
general_ind=np.where(unit_index)[0][u])
# ASSIGN UNITS TO AREAS
if unit_area[u] == 'Br':
unit.annotations['area'] = 'Barrel'
Br.units.append(unit)
unit.channel_index = Br
if unit_area[u] == 'Hp':
unit.annotations['area'] = 'Hippocampus'
Hp.units.append(unit)
unit.channel_index = Hp
# perirhinal units from different sub-areas are grouped together,
# information about the sub-areas is moved to the annotations.
if unit_area[u] == '35':
unit.annotations['area'] = 'Perirhinal - Area 35'
Pr.units.append(unit)
unit.channel_index = Hp
if unit_area[u] == '36':
unit.annotations['area'] = 'Perirhinal - Area 36'
Pr.units.append(unit)
unit.channel_index = Pr
if unit_area[u] == 'b3536':
unit.annotations['area'] = 'Perirhinal - Area 35-36'
Pr.units.append(unit)
unit.channel_index = Pr
if unit_area[u] == 'V':
unit.annotations['area'] = 'V1'
V1.units.append(unit)
unit.channel_index = V1
for t in range(n_trials):
if trials_ok[t]:
# CREATE THE SEGMENT (TRIAL)
seg = neo.Segment(name='Trial %s' %(t+1),
index=t+1,
trial_outcome=trial_outcome[t],
trial_side=trial_side[t],
trial_type=trial_type[t])
# ADD EVENTS TO THE TRIAL (and potentially epochs)
# this dummy event is added due to a bug https://github.com/NeuralEnsemble/python-neo/issues/336
ev = neo.Event(np.array([trial_img_on[t], trial_img_on[t]+1]) * s,
labels=np.array(['Image on', 'Dummy event']), dtype='S')
seg.events.append(ev)
# ADD SEGMENT TO THE BLOCK
bl.segments.append(seg)
for un in bl.list_units:
# extract the spike train for the specific unit
un_ind = un.annotations['sess_ind']
spike_train = (spike_ts[un_ind] - first_ts) / 1e6
# beginning and end of the trial
if not t == n_trials-1:
t_start, t_stop = trial_img_on[t], trial_img_on[t + 1]
else:
t_start, t_stop = trial_img_on[t], last_spike_s
# get the spike times within the trial
# train_trial is the spike train of unit `un` during trial `t`
train_trial = spike_train[(spike_train>=t_start)&(spike_train<t_stop)].copy()
# GENERATE SPIKETRAIN OBJECT
train = neo.SpikeTrain(train_trial*s,
t_start=t_start*s,
t_stop=t_stop*s)
# ASSIGN THE SPIKE TRAIN TO THE TRIAL (SEGMENT)
seg.spiketrains.append(train)
# ASSIGN THE SPIKE TRAIN TO THE UNIT
un.spiketrains.append(train)
# ASSIGN THE SEGMENT AND UNIT TO THE SPIKE TRAIN
train.segment = seg
train.unit = un
### EXAMPLES ###
#
# # Get all the Hippocampal spike trains
# Hp_trains = [unit.spiketrains for unit in bl.channel_indexes[0].units]
#
# # Get all the Hippocampal spike trains of trial 12
# # First, find trial 12 using the index attribute
# trial12 = [seg for seg in bl.segments if seg.index == 12][0]
# # then get all the spike trains from that segments whose units belong to
# # the channel with index 0
# trains = [tr for tr in trial12.spiketrains if tr.unit.channel_index.index == 0]
# ### SAVE THE SESSION ###
# from neo.io.neomatlabio import NeoMatlabIO
# filename = 'TZ_sess_'+str(sess_num)+'.mat'
# w = NeoMatlabIO(filename=filename)
# w.write_block(bl)
#
### READ THE SESSION ###
#r = neo.io.NeoMatlabIO(filename=filename)
#bl = r.read_block()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment