Skip to content

Instantly share code, notes, and snippets.

@dengemann
Created November 11, 2012 18:20
Show Gist options
  • Save dengemann/3d7cb168d5814aab5e5e to your computer and use it in GitHub Desktop.
Save dengemann/3d7cb168d5814aab5e5e to your computer and use it in GitHub Desktop.
Read BTI / 4D data to mne / fiff
#!/usr/bin/env python
# Author: Denis A. Engemann <d.engemann@fz-juelich.de>
# Martin Luessi <mluessi@nmr.mgh.harvard.edu>
# Alexandre Gramfort <gramfort@nmr.mgh.harvard.edu>
# Matti Hamalainen <msh@nmr.mgh.harvard.edu>
# Yuval Harpaz <yuvharpaz@gmail.com>
#
# simplified bsd-3 license
from mne.fiff.constants import Bunch, FIFF
from mne.fiff.raw import Raw, pick_types
import logging
logger = logging.getLogger('mne')
import time
import os.path as op
from datetime import datetime
import numpy as np
FIFF_INFO_CHS_FIELDS = ('loc', 'ch_name', 'unit_mul', 'coil_trans',
'coord_frame', 'coil_type', 'range', 'unit', 'cal', 'eeg_loc',
'scanno', 'kind', 'logno')
FIFF_INFO_CHS_DEFAULTS = (np.array([0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1],
dtype=np.float32), None, 0, None, 0, 0, 1.0,
107, 1.0, None, None, 402, None)
FIFF_INFO_DIG_FIELDS = ("kind", "ident", "r", "coord_frame")
FIFF_INFO_DIG_DEFAULTS = (None, None, None, FIFF.FIFFV_COORD_HEAD)
BTI = Bunch()
BTI.HDR_FILEINFO = 'FILEINFO'
BTI.HDR_DATAFILE = 'DATAFILE'
BTI.HDR_EPOCH_INFORMATION = 'EPOCH INFORMATION'
BTI.HDR_LONGEST_EPOCH = 'LONGEST EPOCH'
BTI.HDR_FIXED_EVENTS = 'FIXED EVENTS'
BTI.HDR_CH_CAL = 'CHANNEL SENSITIVITIES'
BTI.HDR_CH_NAMES = 'CHANNEL LABELS'
BTI.HDR_CH_GROUPS = 'CHANNEL GROUPS'
BTI.HDR_CH_TRANS = 'CHANNEL XFM'
BTI.T_ROT_VV = ((0, -1, 0, 0), (1, 0, 0, 0), (0, 0, 1, 0), (1, 1, 1, 1))
BTI.T_IDENT = ((1, 0, 0, 0), (0, 1, 0, 0), (0, 0, 1, 0), (1, 1, 1, 1))
BTI.T_ROT_IX = slice(0, 3), slice(0, 3)
BTI.T_TRANS_IX = slice(0, 3), slice(3, 4)
BTI.T_SCA_IX = slice(3, 4), slice(0, 4)
# NEUROMAG_XFM = np.array([[0, -1, 0], [1, 0, 0], [0, 0, 1]])
def read_bti_ascii(bti_hdr_fname):
"""Bti ascii export parser
Parameters
----------
bti_hdr_fname : str
Absolute path to the bti ascii header.
Returns
-------
info : dict
bti data info as Python dictionary
"""
f = open(bti_hdr_fname, "r").read()
info = [l for l in f.split("\n") if not l.startswith("#")]
_raw_parsed = {}
current_key = None
for line in info:
if line.isupper() and line.endswith(":"):
current_key = line.strip(":")
_raw_parsed[current_key] = []
else:
_raw_parsed[current_key].append(line)
info = {}
for field, params in _raw_parsed.items():
if field in [BTI.HDR_FILEINFO, BTI.HDR_CH_NAMES,
BTI.HDR_DATAFILE]:
if field == BTI.HDR_DATAFILE:
sep = " : "
elif field == BTI.HDR_FILEINFO:
sep = ":"
else:
sep = None
mapping = [i.strip().split(sep) for i in params]
mapping = [(k.strip(), v.strip()) for k, v in mapping]
if field == BTI.HDR_CH_NAMES:
info[field] = mapping
else:
info[field] = dict(mapping)
if field == BTI.HDR_CH_GROUPS:
ch_groups = {}
for p in params:
if p.endswith("channels"):
ch_groups['CHANNELS'] = int(p.strip().split(' ')[0])
elif "MEG" in p:
ch_groups['MEG'] = int(p.strip().split(' ')[0])
elif "REFERENCE" in p:
ch_groups['REF'] = int(p.strip().split(' ')[0])
elif "EEG" in p:
ch_groups['EEG'] = int(p.strip().split(' ')[0])
elif "TRIGGER" in p:
ch_groups['TRIGGER'] = int(p.strip().split(' ')[0])
elif "UTILITY" in p:
ch_groups['UTILITY'] = int(p.strip().split(' ')[0])
info[BTI.HDR_CH_GROUPS] = ch_groups
elif field == BTI.HDR_CH_CAL:
ch_cal = []
ch_fields = ["ch_name", "group", "cal", "unit"]
for p in params:
this_ch_info = p.strip().split()
ch_cal.append(dict(zip(ch_fields, this_ch_info)))
info[BTI.HDR_CH_CAL] = ch_cal
for field, params in _raw_parsed.items():
if field == BTI.HDR_CH_TRANS:
sensor_trans = {}
idx = 0
for p in params:
if "|" in p:
k, d, _ = p.strip().split("|")
if k.strip().isalnum():
current_chan = info[BTI.HDR_CH_NAMES][idx][0]
sensor_trans[current_chan] = d.strip()
idx += 1
else:
sensor_trans[current_chan] += ", " + d.strip()
info[BTI.HDR_CH_TRANS] = sensor_trans
tsl, duration = _raw_parsed['LONGEST EPOCH'][0].split(', ')
info['FILEINFO']['Time slices'] = tsl.split(': ')[1]
info['FILEINFO']['Total duration'] = duration.strip()
return info
def _get_m_to_nm(adjust=None, translation=(0.0, 0.02, 0.11)):
""" Get general Magnes3600WH to VV transform
Parameters
==========
adjust : int | None
Degrees to tilt x-axis for sensor frame misalignment.
If None, no adjustment will be applied.
translation : array-like
The translation to place the origin of coordinate system
to the center of the head.
Returns
=======
m_nm_t : ndarray
4 x 4 rotation, translation, scaling matrix.
"""
flip_t = np.array(BTI.T_ROT_VV, np.float64)
adjust_t = np.array(BTI.T_IDENT, np.float64)
adjust = 0 if adjust is None else adjust
deg = np.deg2rad(float(adjust))
adjust_t[[1, 2], [1, 2]] = np.cos(deg)
adjust_t[[1, 2], [2, 1]] = -np.sin(deg), np.sin(deg)
m_nm_t = np.ones([4, 4])
m_nm_t[BTI.T_ROT_IX] = np.dot(flip_t[BTI.T_ROT_IX],
adjust_t[BTI.T_ROT_IX])
m_nm_t[BTI.T_TRANS_IX] = np.matrix(translation).T
return m_nm_t
def _read_system_trans(fname):
""" Read global 4D system transform
"""
trans = [l.strip().split(', ') for l in open(fname) if l.startswith(' ')]
return np.array(trans, dtype=float).reshape([4, 4])
def _m_to_nm_coil_trans(ch_t, bti_t, nm_t, nm_default_scale=True):
""" transforms 4D coil position to fiff / Neuromag
"""
# check : inverse_t(apply_t(c1, c1), c1)
ch_t = np.array(ch_t.split(', '), dtype=float).reshape([4, 4])
nm_coil_trans = apply_t(inverse_t(ch_t, bti_t), nm_t)
if nm_default_scale:
nm_coil_trans[3, :3] = 0.
return nm_coil_trans
def inverse_t(x, t, rot=BTI.T_ROT_IX, trans=BTI.T_TRANS_IX, scal=BTI.T_SCA_IX):
""" Apply inverse transform
"""
x = x.copy()
# ... apply scaling
x[scal] *= t[scal]
# ... apply rotation
x[rot] = np.dot(t[rot].T, x[rot])
# ... apply translation
x[trans] -= t[trans]
x[trans] = np.dot(t[rot].T, x[trans])
return x
def apply_t(x, t, rot=BTI.T_ROT_IX, trans=BTI.T_TRANS_IX, scal=BTI.T_SCA_IX):
""" Apply transform
"""
x = x.copy()
# ... apply rotation
x[rot] = np.dot(t[rot], x[rot])
# ... apply translation
x[trans] = np.dot(t[rot], x[trans])
x[trans] += t[trans]
# ... apply scaling
x[scal] *= t[scal]
return x
def cat_t(t1, t2):
""" Helper Function """
t = np.array(BTI.T_IDENT, dtype=np.float32)
t[BTI.T_ROT_IX] = np.dot(t1[BTI.T_ROT_IX], t2[BTI.T_ROT_IX])
t[BTI.T_TRANS_IX] = np.dot(t1[BTI.T_ROT_IX], t2[BTI.T_TRANS_IX])
t[BTI.T_TRANS_IX] += t1[BTI.T_TRANS_IX]
return t
def _read_head_shape(head_shape_fname, use_hpi=False):
""" Read and transform digitation points
from Magnes3600WH to Neuromag
Parameters
==========
head_shape_fname : str
The absolute path to the ascii-exported head shape file.
use_hpi : bool
Whether to treat hpi coils as digitization points or not. If False,
Hpi coils will be discarded.
Returns
=======
dig : list
A list of dictionaries including the dig points and additional info
t : ndarray
The 4 x 4 matrix describing the Magnes3600WH head to Neuromag head
transformation.
"""
target = fiducials = []
dig_points = []
for line in open(head_shape_fname):
if line.startswith('Digitization Points'):
target = dig_points
if line.startswith(' '):
target += [np.array(line.strip().split(), dtype=np.float32)]
fp = np.array(fiducials, dtype=np.float32) # fiducial points
dp = np.sum(fp[2] * (fp[0] - fp[1]))
tmp1, tmp2 = np.sum(fp[2] ** 2), np.sum((fp[0] - fp[1]) ** 2)
dcos = -dp / np.sqrt(tmp1 * tmp2)
dsin = np.sqrt(1. - dcos * dcos)
dt = dp / np.sqrt(tmp2)
fiducials_nm = np.ones([len(fp), 3])
for idx, f in enumerate(fp):
fiducials_nm[idx, 0] = dcos * f[0] - dsin * f[1] + dt
fiducials_nm[idx, 1] = dsin * f[0] + dcos * f[1]
fiducials_nm[idx, 2] = f[2]
# adjust order of fiducials to Neuromag
fiducials_nm[[1, 2]] = fiducials_nm[[2, 1]]
t = np.array(BTI.T_IDENT, dtype=np.float32)
t[0, 0] = dcos
t[0, 1] = -dsin
t[1, 0] = dsin
t[1, 1] = dcos
t[0, 3] = dt
dpnts = np.array(dig_points, dtype=np.float32).T
dig_points_nm = np.dot(t[BTI.T_ROT_IX], dpnts).T
dig_points_nm += t[BTI.T_TRANS_IX].T
all_points = np.r_[fiducials_nm, dig_points_nm]
fiducials_idents = range(1, 4) + range(1, (len(fp) + 1) - 3)
dig = []
for idx in xrange(all_points.shape[0]):
point_info = dict((k, v) for k, v in zip(FIFF_INFO_DIG_FIELDS,
FIFF_INFO_DIG_DEFAULTS))
point_info['r'] = all_points[idx]
if idx < 3:
point_info['kind'] = FIFF.FIFFV_POINT_CARDINAL
point_info['ident'] = fiducials_idents[idx]
if 2 < idx < len(fiducials) and use_hpi:
point_info['kind'] = FIFF.FIFFV_POINT_HPI
point_info['ident'] = fiducials_idents[idx]
elif idx > 4:
point_info['kind'] = FIFF.FIFFV_POINT_EXTRA
point_info['ident'] = (idx + 1) - len(fiducials_idents)
if 2 < idx < len(fiducials) and not use_hpi:
pass
else:
dig.append(point_info)
return dig, t
def _rename_4D_channels(names):
"""Renames appropriately ordered list of channel namaes
"""
renamed = []
count_ref_mag = 0
count_ref_grad = 0
count_eog = 0
count_ext = 0
for i, name in enumerate(names):
if name.startswith('A'):
name = 'MEG %3.3d' % (i + 1)
elif name == 'TRIGGER':
name = 'STI 014'
elif name == 'RESPONSE':
name = 'RSP 001'
elif name.startswith('EOG'):
count_eog += 1
name = 'EOG %3.3d' % count_eog
elif name == 'ECG':
name = 'ECG 001'
elif name == 'UACurrent':
name = 'UTL 001'
elif name.startswith('M'):
count_ref_mag += 1
name = 'RFM %3.3d' % count_ref_mag
elif name.startswith('G'):
count_ref_grad += 1
name = 'RFG %3.3d' % count_ref_grad
elif name.startswith('X'):
count_ext += 1
name = 'EXT %3.3d' % count_ext
renamed.append(name)
return names, renamed
class RawBTI(Raw):
""" intializes object from BTI / 4D asicii exported Magnes3600WH data
Parameters
----------
hdr_fname : str
absolute path to asii header as drawn from msi 'export_data'
data_fname : str
absolute path to asii data as drawn from msi 'export_data'
head_shape_fname : str
absolute path to asii headshape as drawn from bti 'print_hs_file'.
dev_head_t : ndarray | str | None
The device to head transform. If None, an identity matrix is being used.
If str, an BTI ascii file as drawn from the command 'sensor_transformer'
is expected and transformed to the Neuromag coordinate system.
If ndarray, the transform is expected to be aligned with the Neuromag
coordinate system.
data : bool | array-like
if array-like custom data matching the header info to be used
instead of the data from data_fname
adjust : int | None
Degrees to tilt x-axis for sensor frame misalignment.
If None, no adjustment will be applied.
translation : array-like
The translation to place the origin of coordinate system
to the center of the head.
sep : str
seperator used for dates.
use_hpi : bool
Whether to treat hpi coils as digitization points or not. If False,
Hpi coils will be discarded.
"""
def __init__(self, hdr_fname, data_fname, head_shape_fname, dev_head_t=None,
data=None, sep='-', adjust=2, translation=(0.0, 0.02, 0.11),
use_hpi=False):
""" Alternative fiff file constructor
"""
self.hdr = read_bti_ascii(hdr_fname)
self._root, self._hdr_name = op.split(hdr_fname)
self._data_file = data_fname
self.head_shape_fname = head_shape_fname
self._dev_head_t = dev_head_t
self.sep = sep
self._use_hpi = use_hpi
self.bti_to_nm = _get_m_to_nm(adjust, translation)
logger.info(('Initializing Raw instance from BTI/4D ascii-exported'
'data file %s ...' % self._data_file))
info = self._create_raw_info()
self.info = info
self._data = self._read_4D_data() if not data else data
assert len(self._data) == len(self.info['ch_names'])
# get the scalin right
pick_mag = pick_types(info, meg='mag', eeg=False, misc=False,
eog=False, ecg=False)
self._data[pick_mag] *= 1e-15 # put data in Tesla
self.first_samp, self.last_samp = 0, self._data.shape[1] - 1
cals = np.zeros(info['nchan'])
for k in range(info['nchan']):
cals[k] = info['chs'][k]['range'] * \
info['chs'][k]['cal']
self.cals = cals
self.rawdir = None
self.proj = None
self.comp = None
self.verbose = True
if self.verbose:
logger.info(' Range : %d ... %d = %9.3f ... %9.3f secs' % (
self.first_samp, self.last_samp,
float(self.first_samp) / info['sfreq'],
float(self.last_samp) / info['sfreq']))
logger.info('Ready.')
self.fid = None
self._preloaded = True
self._times = np.arange(self.first_samp,
self.last_samp + 1) / info['sfreq']
self._projectors = [None]
self._projector_hashes = [None]
def _create_raw_info(self):
""" Fills list of dicts for initializing empty fiff with 4D data
"""
# intialize info dicitonary and populate it
info = {}
sep = self.sep
d = datetime.strptime(self.hdr['DATAFILE']['Session'],
'%d' + sep + '%y' + sep + '%m %H:%M')
sec = time.mktime(d.timetuple())
info['projs'] = []
info['comps'] = []
info['meas_date'] = np.array([sec, 0], dtype=np.int32)
info['sfreq'] = float(self.hdr["FILEINFO"]["Sample Frequency"][:-2])
info['nchan'] = int(self.hdr["CHANNEL GROUPS"]["CHANNELS"])
channels_4D = np.array([(e[0], i + 1) for i, e in
enumerate(self.hdr["CHANNEL LABELS"])])
ch_names = channels_4D[:, 0].tolist()
ch_lognos = channels_4D[:, 1].tolist()
info['ch_names'] = _rename_4D_channels(ch_names)[1]
ch_mapping = dict(zip(* _rename_4D_channels(ch_names)))
meg_channels = [n for n in info['ch_names'] if n.startswith('MEG')]
ref_magnetometers = [n for n in info['ch_names'] if n.startswith('RFM')]
ref_gradiometers = [n for n in info['ch_names'] if n.startswith('RFG')]
sensor_trans = dict((ch_mapping[k], v) for k, v in
self.hdr['CHANNEL XFM'].items())
info['bads'] = [] # TODO
try: # XXX could be improved
fspec = self.hdr_4D.get('DATAFILE', 'PDF')
fspec = fspec.split(',')[2].split('ord')[0]
ffreqs = fspec.replace('fwsbp', '').split("-")
except:
logger.info("Cannot read any filter specifications." \
"\n No filter info will be set."
ffreqs = 0, 300
info['highpass'], info['lowpass'] = ffreqs
info['acq_pars'], info['acq_stim'] = None, None # both ok
info['filename'] = None # set later on
info['ctf_head_t'] = None # ok like that
info['dev_ctf_t'] = [] # ok like that
info['filenames'] = []
chs = []
# get 4D head_dev_t needed for appropriate
if isinstance(self._dev_head_t, str):
bti_sys_trans = _read_system_trans(self._dev_head_t)
elif hasattr(self._dev_head_t, 'shape'):
if self._dev_head_t.shape != (4, 4):
raise ValueError('A transformation matrix of shape 4 x 4 is'
' expected. Found shape %i x %i instead.'
% self._dev_head_t.shape)
bti_sys_trans = self._dev_head_t
else:
bti_sys_trans = np.array(BTI.T_IDENT, np.float32)
for idx, (chan, logno) in enumerate(zip(info['ch_names'], ch_lognos)):
chan_info = dict((k, v) for k, v in zip(FIFF_INFO_CHS_FIELDS,
FIFF_INFO_CHS_DEFAULTS))
chan_info['ch_name'] = chan
chan_info['logno'] = idx + 1
chan_info['scanno'] = idx + 1
if chan in meg_channels + ref_magnetometers + ref_gradiometers:
t = _m_to_nm_coil_trans(sensor_trans[chan], bti_sys_trans,
self.bti_to_nm)
chan_info['coil_trans'] = t
chan_info['loc'] = np.roll(t.copy().T, 1, 0)[:, :3].flatten()
chan_info['logno'] = idx + 1
if chan in meg_channels:
chan_info['kind'] = FIFF.FIFFV_MEG_CH
chan_info['coil_type'] = FIFF.FIFFV_COIL_MAGNES_MAG
chan_info['coord_frame'] = FIFF.FIFFV_COORD_HEAD
chan_info['unit'] = FIFF.FIFF_UNIT_T
elif chan in ref_magnetometers:
chan_info['kind'] = FIFF.FIFFV_REF_MEG_CH
chan_info['coil_type'] = FIFF.FIFFV_COIL_POINT_MAGNETOMETER
chan_info['coord_frame'] = FIFF.FIFFV_COORD_DEVICE
chan_info['unit'] = FIFF.FIFF_UNIT_T
elif chan in ref_gradiometers:
chan_info['kind'] = FIFF.FIFFV_REF_MEG_CH
chan_info['coil_type'] = FIFF.FIFFV_COIL_AXIAL_GRAD_5CM
chan_info['coord_frame'] = FIFF.FIFFV_COORD_DEVICE
chan_info['unit'] = FIFF.FIFF_UNIT_T_M
elif chan == 'STI 014':
chan_info['kind'] = FIFF.FIFFV_STIM_CH
elif chan.startswith('EOG'):
chan_info['kind'] = FIFF.FIFFV_EOG_CH
elif chan == 'ECG 001':
chan_info['kind'] = FIFF.FIFFV_ECG_CH
elif chan == 'RSP 001':
chan_info['kind'] = FIFF.FIFFV_RESP_CH
elif chan.startswith('EXT'):
chan_info['kind'] = FIFF.FIFFV_MISC_CH
elif chan.startswith('UTL'):
chan_info['kind'] = FIFF.FIFFV_MISC_CH
# other channels implicitly covered
chs.append(chan_info)
info['chs'] = chs
info['meas_id'] = None
info['file_id'] = None
identity = np.array(BTI.T_IDENT, np.float32)
if self.head_shape_fname is not None:
info['dig'], m_h_nm_h = _read_head_shape(self.head_shape_fname,
self._use_hpi)
nm_to_m_sensor = inverse_t(identity, self.bti_to_nm)
nm_sensor_m_head = cat_t(bti_sys_trans, nm_to_m_sensor)
nm_dev_head_t = cat_t(m_h_nm_h, nm_sensor_m_head)
nm_dev_head_t[3, :3] = 0.
else:
nm_dev_head_t = identity
logger.info('Warning. No head shape file provided'
info['dev_head_t'] = {}
info['dev_head_t']['from'] = FIFF.FIFFV_COORD_DEVICE
info['dev_head_t']['to'] = FIFF.FIFFV_COORD_HEAD
info['dev_head_t']['trans'] = nm_dev_head_t
return info
def _read_4D_data(self, count=-1, dtype=np.float32):
""" Reads data from the Juelich 4D format
"""
ntsl = int(self.hdr['FILEINFO']['Time slices'].replace(' slices', ''))
cnt, dtp = count, dtype
with open(self._data_file, 'rb') as f:
data = np.fromfile(file=f, dtype=dtp,
count=cnt).reshape((ntsl,
self.info['nchan']))
return data.T
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment