Skip to content

Instantly share code, notes, and snippets.

@alexrockhill
Last active November 7, 2019 21:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexrockhill/eda1562e8ed22b933cab730c9e9a5851 to your computer and use it in GitHub Desktop.
Save alexrockhill/eda1562e8ed22b933cab730c9e9a5851 to your computer and use it in GitHub Desktop.
mne_bids.write with convert
# -*- coding: utf-8 -*-
"""Test the MNE BIDS converter.
For each supported file format, implement a test.
"""
# Authors: Mainak Jas <mainak.jas@telecom-paristech.fr>
# Teon L Brooks <teon.brooks@gmail.com>
# Chris Holdgraf <choldgraf@berkeley.edu>
# Stefan Appelhoff <stefan.appelhoff@mailbox.org>
# Matt Sanderson <matt.sanderson@mq.edu.au>
#
# License: BSD (3-clause)
import os
import os.path as op
import pytest
from glob import glob
from datetime import datetime
import platform
import shutil as sh
import json
from distutils.version import LooseVersion
import numpy as np
from numpy.testing import assert_array_equal, assert_array_almost_equal
import mne
from mne.datasets import testing
from mne.utils import (_TempDir, run_subprocess, check_version,
requires_nibabel, requires_version)
from mne.io.constants import FIFF
from mne.io.kit.kit import get_kit_info
from mne_bids import (write_raw_bids, read_raw_bids, make_bids_basename,
make_bids_folders, write_anat)
from mne_bids.tsv_handler import _from_tsv, _to_tsv
from mne_bids.utils import _find_matching_sidecar
from mne_bids.pick import coil_type
base_path = op.join(op.dirname(mne.__file__), 'io')
subject_id = '01'
subject_id2 = '02'
session_id = '01'
run = '01'
acq = '01'
run2 = '02'
task = 'testing'
bids_basename = make_bids_basename(
subject=subject_id, session=session_id, run=run, acquisition=acq,
task=task)
bids_basename_minimal = make_bids_basename(subject=subject_id, task=task)
# WINDOWS issues:
# the bids-validator development version does not work properly on Windows as
# of 2019-06-25 --> https://github.com/bids-standard/bids-validator/issues/790
# As a workaround, we try to get the path to the executable from an environment
# variable VALIDATOR_EXECUTABLE ... if this is not possible we assume to be
# using the stable bids-validator and make a direct call of bids-validator
# also: for windows, shell = True is needed to call npm, bids-validator etc.
# see: https://stackoverflow.com/q/28891053/5201771
@pytest.fixture(scope="session")
def _bids_validate():
"""Fixture to run BIDS validator."""
shell = False
bids_validator_exe = ['bids-validator', '--config.error=41',
'--config.error=41']
if platform.system() == 'Windows':
shell = True
exe = os.getenv('VALIDATOR_EXECUTABLE', 'n/a')
if 'VALIDATOR_EXECUTABLE' != 'n/a':
bids_validator_exe = ['node', exe]
def _validate(output_path):
cmd = bids_validator_exe + [output_path]
run_subprocess(cmd, shell=shell)
return _validate
@requires_version('pybv', '0.2.0')
def _test_anonymize(raw, bids_basename, events_fname=None, event_id=None):
output_path = _TempDir()
write_raw_bids(raw, bids_basename, output_path,
events_data=events_fname,
event_id=event_id, anonymize=dict(daysback=600),
overwrite=False)
scans_tsv = make_bids_basename(
subject=subject_id, session=session_id, suffix='scans.tsv',
prefix=op.join(output_path, 'sub-01', 'ses-01'))
data = _from_tsv(scans_tsv)
assert datetime.strptime(data['acq_time'][0],
'%Y-%m-%dT%H:%M:%S').year < 1925
return output_path
@requires_version('pybv', '0.2.0')
def _test_convert(raw, bids_basename, events_fname=None, event_id=None):
output_path = _TempDir()
with pytest.warns(UserWarning, match='Converting data files to ' +
'BrainVision format'):
write_raw_bids(raw, bids_basename, output_path,
events_data=events_fname,
event_id=event_id, convert=True,
overwrite=False, verbose=True)
return output_path
@requires_version('pybv', '0.2.0')
def test_fif(_bids_validate):
"""Test functionality of the write_raw_bids conversion for fif."""
output_path = _TempDir()
data_path = testing.data_path()
raw_fname = op.join(data_path, 'MEG', 'sample',
'sample_audvis_trunc_raw.fif')
event_id = {'Auditory/Left': 1, 'Auditory/Right': 2, 'Visual/Left': 3,
'Visual/Right': 4, 'Smiley': 5, 'Button': 32}
events_fname = op.join(data_path, 'MEG', 'sample',
'sample_audvis_trunc_raw-eve.fif')
raw = mne.io.read_raw_fif(raw_fname)
write_raw_bids(raw, bids_basename, output_path, events_data=events_fname,
event_id=event_id, overwrite=False)
# Read the file back in to check that the data has come through cleanly.
# Events and bad channel information was read through JSON sidecar files.
raw2 = read_raw_bids(bids_basename + '_meg.fif', output_path)
assert set(raw.info['bads']) == set(raw2.info['bads'])
events, _ = mne.events_from_annotations(raw2)
events2 = mne.read_events(events_fname)
events2 = events2[events2[:, 2] != 0]
assert_array_equal(events2[:, 0], events[:, 0])
# check if write_raw_bids works when there is no stim channel
raw.set_channel_types({raw.ch_names[i]: 'misc'
for i in
mne.pick_types(raw.info, stim=True, meg=False)})
output_path = _TempDir()
with pytest.warns(UserWarning, match='No events found or provided.'):
write_raw_bids(raw, bids_basename, output_path, overwrite=False)
_bids_validate(output_path)
# try with eeg data only (conversion to bv)
output_path = _TempDir()
raw = mne.io.read_raw_fif(raw_fname)
raw.load_data()
raw2 = raw.pick_types(meg=False, eeg=True, stim=True, eog=True, ecg=True)
raw2.save(op.join(output_path, 'test-raw.fif'), overwrite=True)
raw2 = mne.io.Raw(op.join(output_path, 'test-raw.fif'), preload=False)
with pytest.warns(UserWarning,
match='Converting data files to BrainVision format'):
write_raw_bids(raw2, bids_basename, output_path,
events_data=events_fname, event_id=event_id,
verbose=True, overwrite=False)
os.remove(op.join(output_path, 'test-raw.fif'))
bids_dir = op.join(output_path, 'sub-%s' % subject_id,
'ses-%s' % session_id, 'eeg')
for sidecar in ['channels.tsv', 'eeg.eeg', 'eeg.json', 'eeg.vhdr',
'eeg.vmrk', 'events.tsv']:
assert op.isfile(op.join(bids_dir, bids_basename + '_' + sidecar))
raw2 = mne.io.read_raw_brainvision(op.join(bids_dir,
bids_basename + '_eeg.vhdr'))
assert_array_almost_equal(raw.get_data(), raw2.get_data())
_bids_validate(output_path)
# write the same data but pretend it is empty room data:
raw = mne.io.read_raw_fif(raw_fname)
er_date = datetime.fromtimestamp(
raw.info['meas_date'][0]).strftime('%Y%m%d')
er_bids_basename = 'sub-emptyroom_ses-{0}_task-noise'.format(str(er_date))
write_raw_bids(raw, er_bids_basename, output_path, overwrite=False)
assert op.exists(op.join(
output_path, 'sub-emptyroom', 'ses-{0}'.format(er_date), 'meg',
'sub-emptyroom_ses-{0}_task-noise_meg.json'.format(er_date)))
# test that an incorrect date raises an error.
er_bids_basename_bad = 'sub-emptyroom_ses-19000101_task-noise'
with pytest.raises(ValueError, match='Date provided'):
write_raw_bids(raw, er_bids_basename_bad, output_path, overwrite=False)
# give the raw object some fake participant data (potentially overwriting)
raw = mne.io.read_raw_fif(raw_fname)
raw.info['subject_info'] = {'his_id': subject_id2,
'birthday': (1993, 1, 26), 'sex': 1}
write_raw_bids(raw, bids_basename, output_path, events_data=events_fname,
event_id=event_id, overwrite=True)
# assert age of participant is correct
participants_tsv = op.join(output_path, 'participants.tsv')
data = _from_tsv(participants_tsv)
assert data['age'][data['participant_id'].index('sub-01')] == '9'
# try and write preloaded data
raw = mne.io.read_raw_fif(raw_fname, preload=True)
with pytest.raises(ValueError, match='preloaded'):
write_raw_bids(raw, bids_basename, output_path,
events_data=events_fname, event_id=event_id,
overwrite=False)
# test anonymize
raw = mne.io.read_raw_fif(raw_fname)
raw.anonymize()
data_path2 = _TempDir()
raw_fname2 = op.join(data_path2, 'sample_audvis_raw.fif')
raw.save(raw_fname2)
bids_basename2 = bids_basename.replace(subject_id, subject_id2)
raw = mne.io.read_raw_fif(raw_fname2)
bids_output_path = write_raw_bids(raw, bids_basename2, output_path,
events_data=events_fname,
event_id=event_id, overwrite=False)
# check that the overwrite parameters work correctly for the participant
# data
# change the gender but don't force overwrite.
raw.info['subject_info'] = {'his_id': subject_id2,
'birthday': (1994, 1, 26), 'sex': 2}
with pytest.raises(FileExistsError, match="already exists"): # noqa: F821
write_raw_bids(raw, bids_basename2, output_path,
events_data=events_fname, event_id=event_id,
overwrite=False)
# now force the overwrite
write_raw_bids(raw, bids_basename2, output_path, events_data=events_fname,
event_id=event_id, overwrite=True)
with pytest.raises(ValueError, match='raw_file must be'):
write_raw_bids('blah', bids_basename, output_path)
bids_basename2 = 'sub-01_ses-01_xyz-01_run-01'
with pytest.raises(KeyError, match='Unexpected entity'):
write_raw_bids(raw, bids_basename2, output_path)
bids_basename2 = 'sub-01_run-01_task-auditory'
with pytest.raises(ValueError, match='ordered correctly'):
write_raw_bids(raw, bids_basename2, output_path, overwrite=True)
del raw._filenames
with pytest.raises(ValueError, match='raw.filenames is missing'):
write_raw_bids(raw, bids_basename2, output_path)
_bids_validate(output_path)
assert op.exists(op.join(output_path, 'participants.tsv'))
# asserting that single fif files do not include the part key
files = glob(op.join(bids_output_path, 'sub-' + subject_id2,
'ses-' + subject_id2, 'meg', '*.fif'))
for ii, FILE in enumerate(files):
assert 'part' not in FILE
assert ii < 1
# test keyword mne-bids anonymize
raw = mne.io.read_raw_fif(raw_fname)
with pytest.raises(ValueError, match='`daysback` must be a positive'):
write_raw_bids(raw, bids_basename, output_path,
events_data=events_fname,
event_id=event_id,
anonymize=dict(daysback=-10),
overwrite=True)
output_path = _TempDir()
raw = mne.io.read_raw_fif(raw_fname)
with pytest.raises(ValueError, match='`daysback` is too large'):
write_raw_bids(raw, bids_basename, output_path,
events_data=events_fname,
event_id=event_id,
anonymize=dict(daysback=10000),
overwrite=False)
output_path = _TempDir()
raw = mne.io.read_raw_fif(raw_fname)
write_raw_bids(raw, bids_basename, output_path,
events_data=events_fname,
event_id=event_id,
anonymize=dict(daysback=600, keep_his=True),
overwrite=False)
scans_tsv = make_bids_basename(
subject=subject_id, session=session_id, suffix='scans.tsv',
prefix=op.join(output_path, 'sub-01', 'ses-01'))
data = _from_tsv(scans_tsv)
assert datetime.strptime(data['acq_time'][0],
'%Y-%m-%dT%H:%M:%S').year < 1925
_bids_validate(output_path)
# check that split files have part key
raw = mne.io.read_raw_fif(raw_fname)
data_path3 = _TempDir()
raw_fname3 = op.join(data_path3, 'sample_audvis_raw.fif')
raw.save(raw_fname3, buffer_size_sec=1.0, split_size='10MB',
split_naming='neuromag', overwrite=True)
raw = mne.io.read_raw_fif(raw_fname3)
subject_id3 = '03'
bids_basename3 = bids_basename.replace(subject_id, subject_id3)
bids_output_path = write_raw_bids(raw, bids_basename3, output_path,
overwrite=False)
files = glob(op.join(bids_output_path, 'sub-' + subject_id3,
'ses-' + subject_id3, 'meg', '*.fif'))
for FILE in files:
assert 'part' in FILE
def test_kit(_bids_validate):
"""Test functionality of the write_raw_bids conversion for KIT data."""
output_path = _TempDir()
data_path = op.join(base_path, 'kit', 'tests', 'data')
raw_fname = op.join(data_path, 'test.sqd')
events_fname = op.join(data_path, 'test-eve.txt')
hpi_fname = op.join(data_path, 'test_mrk.sqd')
hpi_pre_fname = op.join(data_path, 'test_mrk_pre.sqd')
hpi_post_fname = op.join(data_path, 'test_mrk_post.sqd')
electrode_fname = op.join(data_path, 'test_elp.txt')
headshape_fname = op.join(data_path, 'test_hsp.txt')
event_id = dict(cond=1)
kit_bids_basename = bids_basename.replace('_acq-01', '')
raw = mne.io.read_raw_kit(
raw_fname, mrk=hpi_fname, elp=electrode_fname,
hsp=headshape_fname)
write_raw_bids(raw, kit_bids_basename, output_path,
events_data=events_fname,
event_id=event_id, overwrite=False)
_bids_validate(output_path)
assert op.exists(op.join(output_path, 'participants.tsv'))
read_raw_bids(kit_bids_basename + '_meg.sqd', output_path)
# test anonymize
output_path = _test_anonymize(raw, kit_bids_basename,
events_fname, event_id)
_bids_validate(output_path)
# ensure the channels file has no STI 014 channel:
channels_tsv = make_bids_basename(
subject=subject_id, session=session_id, task=task, run=run,
suffix='channels.tsv',
prefix=op.join(output_path, 'sub-01', 'ses-01', 'meg'))
data = _from_tsv(channels_tsv)
assert 'STI 014' not in data['name']
# ensure the marker file is produced in the right place
marker_fname = make_bids_basename(
subject=subject_id, session=session_id, task=task, run=run,
suffix='markers.sqd',
prefix=op.join(output_path, 'sub-01', 'ses-01', 'meg'))
assert op.exists(marker_fname)
# test attempts at writing invalid event data
event_data = np.loadtxt(events_fname)
# make the data the wrong number of dimensions
event_data_3d = np.atleast_3d(event_data)
other_output_path = _TempDir()
with pytest.raises(ValueError, match='two dimensions'):
write_raw_bids(raw, bids_basename, other_output_path,
events_data=event_data_3d, event_id=event_id,
overwrite=True)
# remove 3rd column
event_data = event_data[:, :2]
with pytest.raises(ValueError, match='second dimension'):
write_raw_bids(raw, bids_basename, other_output_path,
events_data=event_data, event_id=event_id,
overwrite=True)
# test correct naming of marker files
raw = mne.io.read_raw_kit(
raw_fname, mrk=[hpi_pre_fname, hpi_post_fname], elp=electrode_fname,
hsp=headshape_fname)
write_raw_bids(raw,
kit_bids_basename.replace('sub-01', 'sub-%s' % subject_id2),
output_path, events_data=events_fname, event_id=event_id,
overwrite=False)
_bids_validate(output_path)
# ensure the marker files are renamed correctly
marker_fname = make_bids_basename(
subject=subject_id2, session=session_id, task=task, run=run,
suffix='markers.sqd', acquisition='pre',
prefix=os.path.join(output_path, 'sub-02', 'ses-01', 'meg'))
info = get_kit_info(marker_fname, False)[0]
assert info['meas_date'] == get_kit_info(hpi_pre_fname,
False)[0]['meas_date']
marker_fname = marker_fname.replace('acq-pre', 'acq-post')
info = get_kit_info(marker_fname, False)[0]
assert info['meas_date'] == get_kit_info(hpi_post_fname,
False)[0]['meas_date']
# check that providing markers in the wrong order raises an error
raw = mne.io.read_raw_kit(
raw_fname, mrk=[hpi_post_fname, hpi_pre_fname], elp=electrode_fname,
hsp=headshape_fname)
with pytest.raises(ValueError, match='Markers'):
write_raw_bids(
raw,
kit_bids_basename.replace('sub-01', 'sub-%s' % subject_id2),
output_path, events_data=events_fname, event_id=event_id,
overwrite=True)
def test_ctf(_bids_validate):
"""Test functionality of the write_raw_bids conversion for CTF data."""
output_path = _TempDir()
data_path = op.join(testing.data_path(download=False), 'CTF')
raw_fname = op.join(data_path, 'testdata_ctf.ds')
raw = mne.io.read_raw_ctf(raw_fname)
with pytest.warns(UserWarning, match='No line frequency'):
write_raw_bids(raw, bids_basename, output_path=output_path)
_bids_validate(output_path)
with pytest.warns(UserWarning, match='Did not find any events'):
raw = read_raw_bids(bids_basename + '_meg.ds', output_path)
# test to check that running again with overwrite == False raises an error
with pytest.raises(FileExistsError, match="already exists"): # noqa: F821
write_raw_bids(raw, bids_basename, output_path=output_path)
assert op.exists(op.join(output_path, 'participants.tsv'))
# test anonymize
raw = mne.io.read_raw_ctf(raw_fname)
with pytest.warns(UserWarning,
match='Converting to FIF for anonymization'):
output_path = _test_anonymize(raw, bids_basename)
_bids_validate(output_path)
def test_bti(_bids_validate):
"""Test functionality of the write_raw_bids conversion for BTi data."""
output_path = _TempDir()
data_path = op.join(base_path, 'bti', 'tests', 'data')
raw_fname = op.join(data_path, 'test_pdf_linux')
config_fname = op.join(data_path, 'test_config_linux')
headshape_fname = op.join(data_path, 'test_hs_linux')
raw = mne.io.read_raw_bti(raw_fname, config_fname=config_fname,
head_shape_fname=headshape_fname)
write_raw_bids(raw, bids_basename, output_path, verbose=True)
assert op.exists(op.join(output_path, 'participants.tsv'))
_bids_validate(output_path)
raw = read_raw_bids(bids_basename + '_meg', output_path)
# test anonymize
raw = mne.io.read_raw_bti(raw_fname, config_fname=config_fname,
head_shape_fname=headshape_fname)
with pytest.warns(UserWarning,
match='Converting to FIF for anonymization'):
output_path = _test_anonymize(raw, bids_basename)
_bids_validate(output_path)
# XXX: vhdr test currently passes only on MNE master. Skip until next release.
# see: https://github.com/mne-tools/mne-python/pull/6558
@pytest.mark.skipif(LooseVersion(mne.__version__) < LooseVersion('0.19'),
reason="requires mne 0.19.dev0 or higher")
def test_vhdr(_bids_validate):
"""Test write_raw_bids conversion for BrainVision data."""
output_path = _TempDir()
data_path = op.join(base_path, 'brainvision', 'tests', 'data')
raw_fname = op.join(data_path, 'test.vhdr')
raw = mne.io.read_raw_brainvision(raw_fname)
# inject a bad channel
assert not raw.info['bads']
injected_bad = ['FP1']
raw.info['bads'] = injected_bad
# write with injected bad channels
write_raw_bids(raw, bids_basename_minimal, output_path, overwrite=False)
_bids_validate(output_path)
# read and also get the bad channels
raw = read_raw_bids(bids_basename_minimal + '_eeg.vhdr', output_path)
# Check that injected bad channel shows up in raw after reading
np.testing.assert_array_equal(np.asarray(raw.info['bads']),
np.asarray(injected_bad))
# Test that correct channel units are written ... and that bad channel
# is in channels.tsv
channels_tsv_name = op.join(output_path, 'sub-{}'.format(subject_id),
'eeg', bids_basename_minimal + '_channels.tsv')
data = _from_tsv(channels_tsv_name)
assert data['units'][data['name'].index('FP1')] == 'µV'
assert data['units'][data['name'].index('CP5')] == 'n/a'
assert data['status'][data['name'].index(injected_bad[0])] == 'bad'
# check events.tsv is written
events_tsv_fname = channels_tsv_name.replace('channels', 'events')
assert op.exists(events_tsv_fname)
# create another bids folder with the overwrite command and check
# no files are in the folder
data_path = make_bids_folders(subject=subject_id, kind='eeg',
output_path=output_path, overwrite=True)
assert len([f for f in os.listdir(data_path) if op.isfile(f)]) == 0
# test anonymize and convert
raw = mne.io.read_raw_brainvision(raw_fname)
_test_anonymize(raw, bids_basename)
_test_convert(raw, bids_basename)
# Also cover iEEG
# We use the same data and pretend that eeg channels are ecog
raw = mne.io.read_raw_brainvision(raw_fname)
raw.set_channel_types({raw.ch_names[i]: 'ecog'
for i in mne.pick_types(raw.info, eeg=True)})
output_path = _TempDir()
write_raw_bids(raw, bids_basename, output_path, overwrite=False)
_bids_validate(output_path)
def test_edf(_bids_validate):
"""Test write_raw_bids conversion for European Data Format data."""
output_path = _TempDir()
data_path = op.join(testing.data_path(), 'EDF')
raw_fname = op.join(data_path, 'test_reduced.edf')
raw = mne.io.read_raw_edf(raw_fname, preload=True)
# XXX: hack that should be fixed later. Annotation reading is
# broken for this file with preload=False and read_annotations_edf
raw.preload = False
raw.rename_channels({raw.info['ch_names'][0]: 'EOG'})
raw.info['chs'][0]['coil_type'] = FIFF.FIFFV_COIL_EEG_BIPOLAR
raw.rename_channels({raw.info['ch_names'][1]: 'EMG'})
raw.set_channel_types({'EMG': 'emg'})
write_raw_bids(raw, bids_basename, output_path)
# Reading the file back should raise an error, because we renamed channels
# in `raw` and used that information to write a channels.tsv. Yet, we
# saved the unchanged `raw` in the BIDS folder, so channels in the TSV and
# in raw clash
with pytest.raises(RuntimeError, match='Channels do not correspond'):
read_raw_bids(bids_basename + '_eeg.edf', output_path)
bids_fname = bids_basename.replace('run-01', 'run-%s' % run2)
write_raw_bids(raw, bids_fname, output_path, overwrite=True)
_bids_validate(output_path)
# ensure there is an EMG channel in the channels.tsv:
channels_tsv = make_bids_basename(
subject=subject_id, session=session_id, task=task, run=run,
suffix='channels.tsv', acquisition=acq,
prefix=op.join(output_path, 'sub-01', 'ses-01', 'eeg'))
data = _from_tsv(channels_tsv)
assert 'ElectroMyoGram' in data['description']
# check that the scans list contains two scans
scans_tsv = make_bids_basename(
subject=subject_id, session=session_id, suffix='scans.tsv',
prefix=op.join(output_path, 'sub-01', 'ses-01'))
data = _from_tsv(scans_tsv)
assert len(list(data.values())[0]) == 2
# Also cover iEEG
# We use the same data and pretend that eeg channels are ecog
raw.set_channel_types({raw.ch_names[i]: 'ecog'
for i in mne.pick_types(raw.info, eeg=True)})
output_path = _TempDir()
write_raw_bids(raw, bids_basename, output_path)
_bids_validate(output_path)
# test anonymize and convert
raw = mne.io.read_raw_edf(raw_fname, preload=True)
raw.preload = False
output_path = _test_anonymize(raw, bids_basename)
_bids_validate(output_path)
output_path = _test_convert(raw, bids_basename)
_bids_validate(output_path)
def test_bdf(_bids_validate):
"""Test write_raw_bids conversion for Biosemi data."""
output_path = _TempDir()
data_path = op.join(base_path, 'edf', 'tests', 'data')
raw_fname = op.join(data_path, 'test.bdf')
raw = mne.io.read_raw_bdf(raw_fname)
with pytest.warns(UserWarning, match='No line frequency found'):
write_raw_bids(raw, bids_basename, output_path, overwrite=False)
_bids_validate(output_path)
# Test also the reading of channel types from channels.tsv
# the first channel in the raw data is not MISC right now
test_ch_idx = 0
assert coil_type(raw.info, test_ch_idx) != 'misc'
# we will change the channel type to MISC and overwrite the channels file
bids_fname = bids_basename + '_eeg.bdf'
channels_fname = _find_matching_sidecar(bids_fname, output_path,
'channels.tsv')
channels_dict = _from_tsv(channels_fname)
channels_dict['type'][test_ch_idx] = 'MISC'
_to_tsv(channels_dict, channels_fname)
# Now read the raw data back from BIDS, with the tampered TSV, to show
# that the channels.tsv truly influences how read_raw_bids sets ch_types
# in the raw data object
raw = read_raw_bids(bids_fname, output_path)
assert coil_type(raw.info, test_ch_idx) == 'misc'
# Test cropped assertion error
raw = mne.io.read_raw_bdf(raw_fname)
raw.crop(0, raw.times[-2])
with pytest.raises(AssertionError, match='cropped'):
write_raw_bids(raw, bids_basename, output_path)
# test anonymize and convert
raw = mne.io.read_raw_bdf(raw_fname)
output_path = _test_anonymize(raw, bids_basename)
_bids_validate(output_path)
output_path = _test_convert(raw, bids_basename)
_bids_validate(output_path)
def test_set(_bids_validate):
"""Test write_raw_bids conversion for EEGLAB data."""
# standalone .set file
output_path = _TempDir()
data_path = op.join(testing.data_path(), 'EEGLAB')
# .set with associated .fdt
output_path = _TempDir()
data_path = op.join(testing.data_path(), 'EEGLAB')
raw_fname = op.join(data_path, 'test_raw.set')
raw = mne.io.read_raw_eeglab(raw_fname)
# embedded - test mne-version assertion
tmp_version = mne.__version__
mne.__version__ = '0.16'
with pytest.raises(ValueError, match='Your version of MNE is too old.'):
write_raw_bids(raw, bids_basename, output_path)
mne.__version__ = tmp_version
# proceed with the actual test for EEGLAB data
write_raw_bids(raw, bids_basename, output_path, overwrite=False)
read_raw_bids(bids_basename + '_eeg.set', output_path)
with pytest.raises(FileExistsError, match="already exists"): # noqa: F821
write_raw_bids(raw, bids_basename, output_path=output_path,
overwrite=False)
_bids_validate(output_path)
# check events.tsv is written
# XXX: only from 0.18 onwards because events_from_annotations
# is broken for earlier versions
events_tsv_fname = op.join(output_path, 'sub-' + subject_id,
'ses-' + session_id, 'eeg',
bids_basename + '_events.tsv')
if check_version('mne', '0.18'):
assert op.exists(events_tsv_fname)
# Also cover iEEG
# We use the same data and pretend that eeg channels are ecog
raw.set_channel_types({raw.ch_names[i]: 'ecog'
for i in mne.pick_types(raw.info, eeg=True)})
output_path = _TempDir()
write_raw_bids(raw, bids_basename, output_path)
_bids_validate(output_path)
# test anonymize and convert
output_path = _test_anonymize(raw, bids_basename)
_bids_validate(output_path)
output_path = _test_convert(raw, bids_basename)
_bids_validate(output_path)
@requires_nibabel()
def test_write_anat(_bids_validate):
"""Test writing anatomical data."""
# Get the MNE testing sample data
import nibabel as nib
output_path = _TempDir()
data_path = testing.data_path()
raw_fname = op.join(data_path, 'MEG', 'sample',
'sample_audvis_trunc_raw.fif')
event_id = {'Auditory/Left': 1, 'Auditory/Right': 2, 'Visual/Left': 3,
'Visual/Right': 4, 'Smiley': 5, 'Button': 32}
events_fname = op.join(data_path, 'MEG', 'sample',
'sample_audvis_trunc_raw-eve.fif')
raw = mne.io.read_raw_fif(raw_fname)
write_raw_bids(raw, bids_basename, output_path, events_data=events_fname,
event_id=event_id, overwrite=False)
# Write some MRI data and supply a `trans`
trans_fname = raw_fname.replace('_raw.fif', '-trans.fif')
trans = mne.read_trans(trans_fname)
# Get the T1 weighted MRI data file
# Needs to be converted to Nifti because we only have mgh in our test base
t1w_mgh = op.join(data_path, 'subjects', 'sample', 'mri', 'T1.mgz')
anat_dir = write_anat(output_path, subject_id, t1w_mgh, session_id, acq,
raw=raw, trans=trans, deface=True, verbose=True,
overwrite=True)
_bids_validate(output_path)
# Validate that files are as expected
t1w_json_path = op.join(anat_dir, 'sub-01_ses-01_acq-01_T1w.json')
assert op.exists(t1w_json_path)
assert op.exists(op.join(anat_dir, 'sub-01_ses-01_acq-01_T1w.nii.gz'))
with open(t1w_json_path, 'r') as f:
t1w_json = json.load(f)
print(t1w_json)
# We only should have AnatomicalLandmarkCoordinates as key
np.testing.assert_array_equal(list(t1w_json.keys()),
['AnatomicalLandmarkCoordinates'])
# And within AnatomicalLandmarkCoordinates only LPA, NAS, RPA in that order
anat_dict = t1w_json['AnatomicalLandmarkCoordinates']
point_list = ['LPA', 'NAS', 'RPA']
np.testing.assert_array_equal(list(anat_dict.keys()),
point_list)
# test the actual values of the voxels (no floating points)
for i, point in enumerate([(66, 51, 46), (41, 32, 74), (17, 53, 47)]):
coords = anat_dict[point_list[i]]
np.testing.assert_array_equal(np.asarray(coords, dtype=int),
point)
# BONUS: test also that we can find the matching sidecar
side_fname = _find_matching_sidecar('sub-01_ses-01_acq-01_T1w.nii.gz',
output_path, 'T1w.json')
assert op.split(side_fname)[-1] == 'sub-01_ses-01_acq-01_T1w.json'
# Now try some anat writing that will fail
# We already have some MRI data there
with pytest.raises(IOError, match='`overwrite` is set to False'):
write_anat(output_path, subject_id, t1w_mgh, session_id, acq,
raw=raw, trans=trans, verbose=True, deface=False,
overwrite=False)
# pass some invalid type as T1 MRI
with pytest.raises(ValueError, match='must be a path to a T1 weighted'):
write_anat(output_path, subject_id, 9999999999999, session_id, raw=raw,
trans=trans, verbose=True, deface=False, overwrite=True)
# Return without writing sidecar
sh.rmtree(anat_dir)
write_anat(output_path, subject_id, t1w_mgh, session_id)
# Assert that we truly cannot find a sidecar
with pytest.raises(RuntimeError, match='Did not find any'):
_find_matching_sidecar('sub-01_ses-01_acq-01_T1w.nii.gz',
output_path, 'T1w.json')
# trans has a wrong type
wrong_type = 1
match = 'transform type {} not known, must be'.format(type(wrong_type))
with pytest.raises(ValueError, match=match):
write_anat(output_path, subject_id, t1w_mgh, session_id, raw=raw,
trans=wrong_type, verbose=True, deface=False,
overwrite=True)
# trans is a str, but file does not exist
wrong_fname = 'not_a_trans'
match = 'trans file "{}" not found'.format(wrong_fname)
with pytest.raises(IOError, match=match):
write_anat(output_path, subject_id, t1w_mgh, session_id, raw=raw,
trans=wrong_fname, verbose=True, overwrite=True)
# However, reading trans if it is a string pointing to trans is fine
write_anat(output_path, subject_id, t1w_mgh, session_id, raw=raw,
trans=trans_fname, verbose=True, deface=False,
overwrite=True)
# Writing without a session does NOT yield "ses-None" anywhere
anat_dir2 = write_anat(output_path, subject_id, t1w_mgh, None)
assert 'ses-None' not in anat_dir2
assert op.exists(op.join(anat_dir2, 'sub-01_T1w.nii.gz'))
# specify trans but not raw
with pytest.raises(ValueError, match='must be specified if `trans`'):
write_anat(output_path, subject_id, t1w_mgh, session_id, raw=None,
trans=trans, verbose=True, deface=False, overwrite=True)
# test deface
anat_dir = write_anat(output_path, subject_id, t1w_mgh,
session_id, raw=raw, trans=trans_fname,
verbose=True, deface=True, overwrite=True)
t1w = nib.load(op.join(anat_dir, 'sub-01_ses-01_T1w.nii.gz'))
vox_sum = t1w.get_data().sum()
anat_dir2 = write_anat(output_path, subject_id, t1w_mgh,
session_id, raw=raw, trans=trans_fname,
verbose=True, deface=dict(inset=25.),
overwrite=True)
t1w2 = nib.load(op.join(anat_dir2, 'sub-01_ses-01_T1w.nii.gz'))
vox_sum2 = t1w2.get_data().sum()
assert vox_sum > vox_sum2
anat_dir3 = write_anat(output_path, subject_id, t1w_mgh,
session_id, raw=raw, trans=trans_fname,
verbose=True, deface=dict(theta=25),
overwrite=True)
t1w3 = nib.load(op.join(anat_dir3, 'sub-01_ses-01_T1w.nii.gz'))
vox_sum3 = t1w3.get_data().sum()
assert vox_sum > vox_sum3
with pytest.raises(ValueError,
match='The raw object, trans and raw must be provided'):
write_anat(output_path, subject_id, t1w_mgh, session_id, raw=raw,
trans=None, verbose=True, deface=True,
overwrite=True)
with pytest.raises(ValueError, match='inset must be numeric'):
write_anat(output_path, subject_id, t1w_mgh, session_id, raw=raw,
trans=trans, verbose=True, deface=dict(inset='small'),
overwrite=True)
with pytest.raises(ValueError, match='inset should be positive'):
write_anat(output_path, subject_id, t1w_mgh, session_id, raw=raw,
trans=trans, verbose=True, deface=dict(inset=-2.),
overwrite=True)
with pytest.raises(ValueError, match='theta must be numeric'):
write_anat(output_path, subject_id, t1w_mgh, session_id, raw=raw,
trans=trans, verbose=True, deface=dict(theta='big'),
overwrite=True)
with pytest.raises(ValueError,
match='theta should be between 0 and 90 degrees'):
write_anat(output_path, subject_id, t1w_mgh, session_id, raw=raw,
trans=trans, verbose=True, deface=dict(theta=100),
overwrite=True)
"""Make BIDS compatible directory structures and infer meta data from MNE."""
# Authors: Mainak Jas <mainak.jas@telecom-paristech.fr>
# Alexandre Gramfort <alexandre.gramfort@telecom-paristech.fr>
# Teon Brooks <teon.brooks@gmail.com>
# Chris Holdgraf <choldgraf@berkeley.edu>
# Stefan Appelhoff <stefan.appelhoff@mailbox.org>
# Matt Sanderson <matt.sanderson@mq.edu.au>
#
# License: BSD (3-clause)
import os
import os.path as op
from datetime import datetime, date, timedelta
from warnings import warn
import shutil as sh
from collections import defaultdict, OrderedDict
import numpy as np
from numpy.testing import assert_array_equal
from mne.transforms import (_get_trans, apply_trans, get_ras_to_neuromag_trans,
rotation, translation)
from mne import Epochs, events_from_annotations
from mne.io.constants import FIFF
from mne.io.pick import channel_type
from mne.io import BaseRaw, anonymize_info, _stamp_to_dt
from mne.channels.channels import _unit2human
from mne.utils import check_version, has_nibabel
from mne_bids.pick import coil_type
from mne_bids.utils import (_write_json, _write_tsv, _read_events, _mkdir_p,
_age_on_date, _infer_eeg_placement_scheme,
_check_key_val,
_parse_bids_filename, _handle_kind, _check_types,
_get_mrk_meas_date, _extract_landmarks, _parse_ext,
_get_ch_type_mapping)
from mne_bids.copyfiles import (copyfile_brainvision, copyfile_eeglab,
copyfile_ctf, copyfile_bti)
from mne_bids.read import reader
from mne_bids.tsv_handler import _from_tsv, _combine, _drop, _contains_row
from mne_bids.config import (ORIENTATION, UNITS, MANUFACTURERS,
IGNORED_CHANNELS, ALLOWED_EXTENSIONS,
BIDS_VERSION)
def _is_numeric(n):
return isinstance(n, (np.integer, np.floating, int, float))
def _channels_tsv(raw, fname, overwrite=False, verbose=True):
"""Create a channels.tsv file and save it.
Parameters
----------
raw : instance of Raw
The data as MNE-Python Raw object.
fname : str
Filename to save the channels.tsv to.
overwrite : bool
Whether to overwrite the existing file.
Defaults to False.
verbose : bool
Set verbose output to true or false.
"""
# Get channel type mappings between BIDS and MNE nomenclatures
map_chs = _get_ch_type_mapping(fro='mne', to='bids')
# Prepare the descriptions for each channel type
map_desc = defaultdict(lambda: 'Other type of channel')
map_desc.update(meggradaxial='Axial Gradiometer',
megrefgradaxial='Axial Gradiometer Reference',
meggradplanar='Planar Gradiometer',
megmag='Magnetometer',
megrefmag='Magnetometer Reference',
stim='Trigger',
eeg='ElectroEncephaloGram',
ecog='Electrocorticography',
seeg='StereoEEG',
ecg='ElectroCardioGram',
eog='ElectroOculoGram',
emg='ElectroMyoGram',
misc='Miscellaneous')
get_specific = ('mag', 'ref_meg', 'grad')
# get the manufacturer from the file in the Raw object
manufacturer = None
_, ext = _parse_ext(raw.filenames[0], verbose=verbose)
manufacturer = MANUFACTURERS[ext]
ignored_channels = IGNORED_CHANNELS.get(manufacturer, list())
status, ch_type, description = list(), list(), list()
for idx, ch in enumerate(raw.info['ch_names']):
status.append('bad' if ch in raw.info['bads'] else 'good')
_channel_type = channel_type(raw.info, idx)
if _channel_type in get_specific:
_channel_type = coil_type(raw.info, idx, _channel_type)
ch_type.append(map_chs[_channel_type])
description.append(map_desc[_channel_type])
low_cutoff, high_cutoff = (raw.info['highpass'], raw.info['lowpass'])
if raw._orig_units:
units = [raw._orig_units.get(ch, 'n/a') for ch in raw.ch_names]
else:
units = [_unit2human.get(ch_i['unit'], 'n/a')
for ch_i in raw.info['chs']]
units = [u if u not in ['NA'] else 'n/a' for u in units]
n_channels = raw.info['nchan']
sfreq = raw.info['sfreq']
ch_data = OrderedDict([
('name', raw.info['ch_names']),
('type', ch_type),
('units', units),
('low_cutoff', np.full((n_channels), low_cutoff)),
('high_cutoff', np.full((n_channels), high_cutoff)),
('description', description),
('sampling_frequency', np.full((n_channels), sfreq)),
('status', status)])
ch_data = _drop(ch_data, ignored_channels, 'name')
_write_tsv(fname, ch_data, overwrite, verbose)
return fname
def _events_tsv(events, raw, fname, trial_type, overwrite=False,
verbose=True):
"""Create an events.tsv file and save it.
This function will write the mandatory 'onset', and 'duration' columns as
well as the optional 'value' and 'sample'. The 'value'
corresponds to the marker value as found in the TRIG channel of the
recording. In addition, the 'trial_type' field can be written.
Parameters
----------
events : array, shape = (n_events, 3)
The first column contains the event time in samples and the third
column contains the event id. The second column is ignored for now but
typically contains the value of the trigger channel either immediately
before the event or immediately after.
raw : instance of Raw
The data as MNE-Python Raw object.
fname : str
Filename to save the events.tsv to.
trial_type : dict | None
Dictionary mapping a brief description key to an event id (value). For
example {'Go': 1, 'No Go': 2}.
overwrite : bool
Whether to overwrite the existing file.
Defaults to False.
verbose : bool
Set verbose output to true or false.
Notes
-----
The function writes durations of zero for each event.
"""
# Start by filling all data that we know into an ordered dictionary
first_samp = raw.first_samp
sfreq = raw.info['sfreq']
events[:, 0] -= first_samp
# Onset column needs to be specified in seconds
data = OrderedDict([('onset', events[:, 0] / sfreq),
('duration', np.zeros(events.shape[0])),
('trial_type', None),
('value', events[:, 2]),
('sample', events[:, 0])])
# Now check if trial_type is specified or should be removed
if trial_type:
trial_type_map = {v: k for k, v in trial_type.items()}
data['trial_type'] = [trial_type_map.get(i, 'n/a') for
i in events[:, 2]]
else:
del data['trial_type']
_write_tsv(fname, data, overwrite, verbose)
return fname
def _participants_tsv(raw, subject_id, fname, overwrite=False,
verbose=True):
"""Create a participants.tsv file and save it.
This will append any new participant data to the current list if it
exists. Otherwise a new file will be created with the provided information.
Parameters
----------
raw : instance of Raw
The data as MNE-Python Raw object.
subject_id : str
The subject name in BIDS compatible format ('01', '02', etc.)
fname : str
Filename to save the participants.tsv to.
overwrite : bool
Whether to overwrite the existing file.
Defaults to False.
If there is already data for the given `subject_id` and overwrite is
False, an error will be raised.
verbose : bool
Set verbose output to true or false.
"""
subject_id = 'sub-' + subject_id
data = OrderedDict(participant_id=[subject_id])
subject_age = "n/a"
sex = "n/a"
subject_info = raw.info['subject_info']
if subject_info is not None:
sexes = {0: 'n/a', 1: 'M', 2: 'F'}
sex = sexes[subject_info.get('sex', 0)]
# determine the age of the participant
age = subject_info.get('birthday', None)
meas_date = raw.info.get('meas_date', None)
if isinstance(meas_date, (tuple, list, np.ndarray)):
meas_date = meas_date[0]
if meas_date is not None and age is not None:
bday = datetime(age[0], age[1], age[2])
meas_datetime = datetime.fromtimestamp(meas_date)
subject_age = _age_on_date(bday, meas_datetime)
else:
subject_age = "n/a"
data.update({'age': [subject_age], 'sex': [sex]})
if os.path.exists(fname):
orig_data = _from_tsv(fname)
# whether the new data exists identically in the previous data
exact_included = _contains_row(orig_data,
{'participant_id': subject_id,
'age': subject_age,
'sex': sex})
# whether the subject id is in the previous data
sid_included = subject_id in orig_data['participant_id']
# if the subject data provided is different to the currently existing
# data and overwrite is not True raise an error
if (sid_included and not exact_included) and not overwrite:
raise FileExistsError('"%s" already exists in the participant ' # noqa: E501 F821
'list. Please set overwrite to '
'True.' % subject_id)
# otherwise add the new data
data = _combine(orig_data, data, 'participant_id')
# overwrite is forced to True as all issues with overwrite == False have
# been handled by this point
_write_tsv(fname, data, True, verbose)
return fname
def _participants_json(fname, overwrite=False, verbose=True):
"""Create participants.json for non-default columns in accompanying TSV.
Parameters
----------
fname : str
Filename to save the scans.tsv to.
overwrite : bool
Defaults to False.
Whether to overwrite the existing data in the file.
If there is already data for the given `fname` and overwrite is False,
an error will be raised.
verbose : bool
Set verbose output to true or false.
"""
cols = OrderedDict()
cols['participant_id'] = {'Description': 'Unique participant identifier'}
cols['age'] = {'Description': 'Age of the participant at time of testing',
'Units': 'years'}
cols['sex'] = {'Description': 'Biological sex of the participant',
'Levels': {'F': 'female', 'M': 'male'}}
_write_json(fname, cols, overwrite, verbose)
return fname
def _scans_tsv(raw, raw_fname, fname, overwrite=False, verbose=True):
"""Create a scans.tsv file and save it.
Parameters
----------
raw : instance of Raw
The data as MNE-Python Raw object.
raw_fname : str
Relative path to the raw data file.
fname : str
Filename to save the scans.tsv to.
overwrite : bool
Defaults to False.
Whether to overwrite the existing data in the file.
If there is already data for the given `fname` and overwrite is False,
an error will be raised.
verbose : bool
Set verbose output to true or false.
"""
# get measurement date from the data info
meas_date = raw.info['meas_date']
if isinstance(meas_date, (tuple, list, np.ndarray)):
meas_date = meas_date[0]
# windows datetime bug for timestamp < 0
# OSError [Errno 22] Invalid Argument
acq_time = \
(datetime.fromtimestamp(0) +
timedelta(seconds=int(meas_date))).strftime(
'%Y-%m-%dT%H:%M:%S')
else:
acq_time = 'n/a'
data = OrderedDict([('filename', ['%s' % raw_fname.replace(os.sep, '/')]),
('acq_time', [acq_time])])
if os.path.exists(fname):
orig_data = _from_tsv(fname)
# if the file name is already in the file raise an error
if raw_fname in orig_data['filename'] and not overwrite:
raise FileExistsError('"%s" already exists in the scans list. ' # noqa: E501 F821
'Please set overwrite to True.' % raw_fname)
# otherwise add the new data
data = _combine(orig_data, data, 'filename')
# overwrite is forced to True as all issues with overwrite == False have
# been handled by this point
_write_tsv(fname, data, True, verbose)
return fname
def _coordsystem_json(raw, unit, orient, manufacturer, fname,
overwrite=False, verbose=True):
"""Create a coordsystem.json file and save it.
Parameters
----------
raw : instance of Raw
The data as MNE-Python Raw object.
unit : str
Units to be used in the coordsystem specification.
orient : str
Used to define the coordinate system for the head coils.
manufacturer : str
Used to define the coordinate system for the MEG sensors.
fname : str
Filename to save the coordsystem.json to.
overwrite : bool
Whether to overwrite the existing file.
Defaults to False.
verbose : bool
Set verbose output to true or false.
"""
dig = raw.info['dig']
coords = _extract_landmarks(dig)
hpi = {d['ident']: d for d in dig if d['kind'] == FIFF.FIFFV_POINT_HPI}
if hpi:
for ident in hpi.keys():
coords['coil%d' % ident] = hpi[ident]['r'].tolist()
coord_frame = set([dig[ii]['coord_frame'] for ii in range(len(dig))])
if len(coord_frame) > 1:
err = 'All HPI and Fiducials must be in the same coordinate frame.'
raise ValueError(err)
fid_json = {'MEGCoordinateSystem': manufacturer,
'MEGCoordinateUnits': unit, # XXX validate this
'HeadCoilCoordinates': coords,
'HeadCoilCoordinateSystem': orient,
'HeadCoilCoordinateUnits': unit # XXX validate this
}
_write_json(fname, fid_json, overwrite, verbose)
return fname
def _sidecar_json(raw, task, manufacturer, fname, kind, overwrite=False,
verbose=True):
"""Create a sidecar json file depending on the kind and save it.
The sidecar json file provides meta data about the data of a certain kind.
Parameters
----------
raw : instance of Raw
The data as MNE-Python Raw object.
task : str
Name of the task the data is based on.
manufacturer : str
Manufacturer of the acquisition system. For MEG also used to define the
coordinate system for the MEG sensors.
fname : str
Filename to save the sidecar json to.
kind : str
Type of the data as in ALLOWED_KINDS.
overwrite : bool
Whether to overwrite the existing file.
Defaults to False.
verbose : bool
Set verbose output to true or false. Defaults to true.
"""
sfreq = raw.info['sfreq']
powerlinefrequency = raw.info.get('line_freq', None)
if powerlinefrequency is None:
warn('No line frequency found, defaulting to 50 Hz')
powerlinefrequency = 50
if isinstance(raw, BaseRaw):
rec_type = 'continuous'
elif isinstance(raw, Epochs):
rec_type = 'epoched'
else:
rec_type = 'n/a'
# determine whether any channels have to be ignored:
n_ignored = len([ch_name for ch_name in
IGNORED_CHANNELS.get(manufacturer, list()) if
ch_name in raw.ch_names])
# all ignored channels are trigger channels at the moment...
n_megchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_MEG_CH])
n_megrefchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_REF_MEG_CH])
n_eegchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_EEG_CH])
n_ecogchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_ECOG_CH])
n_seegchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_SEEG_CH])
n_eogchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_EOG_CH])
n_ecgchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_ECG_CH])
n_emgchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_EMG_CH])
n_miscchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_MISC_CH])
n_stimchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_STIM_CH]) - n_ignored
# Define modality-specific JSON dictionaries
ch_info_json_common = [
('TaskName', task),
('Manufacturer', manufacturer),
('PowerLineFrequency', powerlinefrequency),
('SamplingFrequency', sfreq),
('SoftwareFilters', 'n/a'),
('RecordingDuration', raw.times[-1]),
('RecordingType', rec_type)]
ch_info_json_meg = [
('DewarPosition', 'n/a'),
('DigitizedLandmarks', False),
('DigitizedHeadPoints', False),
('MEGChannelCount', n_megchan),
('MEGREFChannelCount', n_megrefchan)]
ch_info_json_eeg = [
('EEGReference', 'n/a'),
('EEGGround', 'n/a'),
('EEGPlacementScheme', _infer_eeg_placement_scheme(raw)),
('Manufacturer', manufacturer)]
ch_info_json_ieeg = [
('iEEGReference', 'n/a'),
('ECOGChannelCount', n_ecogchan),
('SEEGChannelCount', n_seegchan)]
ch_info_ch_counts = [
('EEGChannelCount', n_eegchan),
('EOGChannelCount', n_eogchan),
('ECGChannelCount', n_ecgchan),
('EMGChannelCount', n_emgchan),
('MiscChannelCount', n_miscchan),
('TriggerChannelCount', n_stimchan)]
# Stitch together the complete JSON dictionary
ch_info_json = ch_info_json_common
if kind == 'meg':
append_kind_json = ch_info_json_meg
elif kind == 'eeg':
append_kind_json = ch_info_json_eeg
elif kind == 'ieeg':
append_kind_json = ch_info_json_ieeg
ch_info_json += append_kind_json
ch_info_json += ch_info_ch_counts
ch_info_json = OrderedDict(ch_info_json)
_write_json(fname, ch_info_json, overwrite, verbose)
return fname
def _deface(t1w, mri_landmarks, deface, trans, raw):
if not has_nibabel(): # pragma: no cover
raise ImportError('This function requires nibabel.')
import nibabel as nib
inset, theta = (20, 35.)
if isinstance(deface, dict):
if 'inset' in deface:
inset = deface['inset']
if 'theta' in deface:
theta = deface['theta']
if not _is_numeric(inset):
raise ValueError('inset must be numeric (float, int). '
'Got %s' % type(inset))
if not _is_numeric(theta):
raise ValueError('theta must be numeric (float, int). '
'Got %s' % type(theta))
if inset < 0:
raise ValueError('inset should be positive, '
'Got %s' % inset)
if not 0 < theta < 90:
raise ValueError('theta should be between 0 and 90 '
'degrees. Got %s' % theta)
# x: L/R L+, y: S/I I+, z: A/P A+
t1w_data = t1w.get_data().copy()
idxs_vox = np.meshgrid(np.arange(t1w_data.shape[0]),
np.arange(t1w_data.shape[1]),
np.arange(t1w_data.shape[2]),
indexing='ij')
idxs_vox = np.array(idxs_vox) # (3, *t1w_data.shape)
idxs_vox = np.transpose(idxs_vox,
[1, 2, 3, 0]) # (*t1w_data.shape, 3)
idxs_vox = idxs_vox.reshape(-1, 3) # (n_voxels, 3)
mri_landmarks_ras = apply_trans(t1w.affine, mri_landmarks)
ras_meg_t = \
get_ras_to_neuromag_trans(*mri_landmarks_ras[[1, 0, 2]])
idxs_ras = apply_trans(t1w.affine, idxs_vox)
idxs_meg = apply_trans(ras_meg_t, idxs_ras)
# now comes the actual defacing
# 1. move center of voxels to (nasion - inset)
# 2. rotate the head by theta from the normal to the plane passing
# through anatomical coordinates
trans_y = -mri_landmarks_ras[1, 1] + inset
idxs_meg = apply_trans(translation(y=trans_y), idxs_meg)
idxs_meg = apply_trans(rotation(x=-np.deg2rad(theta)), idxs_meg)
coords = idxs_meg.reshape(t1w.shape + (3,)) # (*t1w_data.shape, 3)
mask = (coords[..., 2] < 0) # z < 0
t1w_data[mask] = 0.
# smooth decided against for potential lack of anonymizaton
# https://gist.github.com/alexrockhill/15043928b716a432db3a84a050b241ae
t1w = nib.Nifti1Image(t1w_data, t1w.affine, t1w.header)
return t1w
def _write_raw_fif(raw, bids_fname):
"""Save out the raw file in FIF.
Parameters
----------
raw : mne.io.Raw
Raw file to save out.
bids_fname : str
The name of the BIDS-specified file where the raw object
should be saved.
"""
n_rawfiles = len(raw.filenames)
if n_rawfiles > 1:
split_naming = 'bids'
raw.save(bids_fname, split_naming=split_naming, overwrite=True)
else:
# This ensures that single FIF files do not have the part param
raw.save(bids_fname, split_naming='neuromag', overwrite=True)
def _write_raw_brainvision(raw, bids_fname):
"""Save out the raw file in BrainVision format.
Parameters
----------
raw : mne.io.Raw
Raw file to save out.
bids_fname : str
The name of the BIDS-specified file where the raw object
should be saved.
"""
if not check_version('pybv', '0.2'):
raise ImportError('pybv >=0.2.0 is required for converting ' +
'file to Brainvision format')
from pybv import write_brainvision
events, event_id = events_from_annotations(raw)
write_brainvision(raw.get_data(), raw.info['sfreq'],
raw.ch_names,
op.splitext(op.basename(bids_fname))[0],
op.dirname(bids_fname), events[:, [0, 2]],
resolution=1e-6)
def make_bids_basename(subject=None, session=None, task=None,
acquisition=None, run=None, processing=None,
recording=None, space=None, prefix=None, suffix=None):
"""Create a partial/full BIDS filename from its component parts.
BIDS filename prefixes have one or more pieces of metadata in them. They
must follow a particular order, which is followed by this function. This
will generate the *prefix* for a BIDS filename that can be used with many
subsequent files, or you may also give a suffix that will then complete
the file name.
Note that all parameters are not applicable to each kind of data. For
example, electrode location TSV files do not need a task field.
Parameters
----------
subject : str | None
The subject ID. Corresponds to "sub".
session : str | None
The session for a item. Corresponds to "ses".
task : str | None
The task for a item. Corresponds to "task".
acquisition: str | None
The acquisition parameters for the item. Corresponds to "acq".
run : int | None
The run number for this item. Corresponds to "run".
processing : str | None
The processing label for this item. Corresponds to "proc".
recording : str | None
The recording name for this item. Corresponds to "recording".
space : str | None
The coordinate space for an anatomical file. Corresponds to "space".
prefix : str | None
The prefix for the filename to be created. E.g., a path to the folder
in which you wish to create a file with this name.
suffix : str | None
The suffix for the filename to be created. E.g., 'audio.wav'.
Returns
-------
filename : str
The BIDS filename you wish to create.
Examples
--------
>>> print(make_bids_basename(subject='test', session='two', task='mytask', suffix='data.csv')) # noqa: E501
sub-test_ses-two_task-mytask_data.csv
"""
order = OrderedDict([('sub', subject),
('ses', session),
('task', task),
('acq', acquisition),
('run', run),
('proc', processing),
('space', space),
('recording', recording)])
if order['run'] is not None and not isinstance(order['run'], str):
# Ensure that run is a string
order['run'] = '{:02}'.format(order['run'])
_check_types(order.values())
if (all(ii is None for ii in order.values()) and suffix is None and
prefix is None):
raise ValueError("At least one parameter must be given.")
filename = []
for key, val in order.items():
if val is not None:
_check_key_val(key, val)
filename.append('%s-%s' % (key, val))
if isinstance(suffix, str):
filename.append(suffix)
filename = '_'.join(filename)
if isinstance(prefix, str):
filename = op.join(prefix, filename)
return filename
def make_bids_folders(subject, session=None, kind=None, output_path=None,
make_dir=True, overwrite=False, verbose=False):
"""Create a BIDS folder hierarchy.
This creates a hierarchy of folders *within* a BIDS dataset. You should
plan to create these folders *inside* the output_path folder of the dataset.
Parameters
----------
subject : str
The subject ID. Corresponds to "sub".
kind : str
The kind of folder being created at the end of the hierarchy. E.g.,
"anat", "func", etc.
session : str | None
The session for a item. Corresponds to "ses".
output_path : str | None
The output_path for the folders to be created. If None, folders will be
created in the current working directory.
make_dir : bool
Whether to actually create the folders specified. If False, only a
path will be generated but no folders will be created.
overwrite : bool
How to handle overwriting previously generated data.
If overwrite == False then no existing folders will be removed, however
if overwrite == True then any existing folders at the session level
or lower will be removed, including any contained data.
verbose : bool
If verbose is True, print status updates
as folders are created.
Returns
-------
path : str
The (relative) path to the folder that was created.
Examples
--------
>>> print(make_bids_folders('sub_01', session='my_session',
kind='meg', output_path='path/to/project',
make_dir=False)) # noqa
path/to/project/sub-sub_01/ses-my_session/meg
"""
_check_types((subject, kind, session, output_path))
if session is not None:
_check_key_val('ses', session)
path = ['sub-%s' % subject]
if isinstance(session, str):
path.append('ses-%s' % session)
if isinstance(kind, str):
path.append(kind)
path = op.join(*path)
if isinstance(output_path, str):
path = op.join(output_path, path)
if make_dir is True:
_mkdir_p(path, overwrite=overwrite, verbose=verbose)
return path
def make_dataset_description(path, name=None, data_license=None,
authors=None, acknowledgements=None,
how_to_acknowledge=None, funding=None,
references_and_links=None, doi=None,
verbose=False):
"""Create json for a dataset description.
BIDS datasets may have one or more fields, this function allows you to
specify which you wish to include in the description. See the BIDS
documentation for information about what each field means.
Parameters
----------
path : str
A path to a folder where the description will be created.
name : str | None
The name of this BIDS dataset.
data_license : str | None
The license under which this datset is published.
authors : list | str | None
List of individuals who contributed to the creation/curation of the
dataset. Must be a list of strings or a single comma separated string
like ['a', 'b', 'c'].
acknowledgements : list | str | None
Either a str acknowledging individuals who contributed to the
creation/curation of this dataset OR a list of the individuals'
names as str.
how_to_acknowledge : list | str | None
Either a str describing how to acknowledge this dataset OR a list of
publications that should be cited.
funding : list | str | None
List of sources of funding (e.g., grant numbers). Must be a list of
strings or a single comma separated string like ['a', 'b', 'c'].
references_and_links : list | str | None
List of references to publication that contain information on the
dataset, or links. Must be a list of strings or a single comma
separated string like ['a', 'b', 'c'].
doi : str | None
The DOI for the dataset.
Notes
-----
The required field BIDSVersion will be automatically filled by mne_bids.
"""
# Put potential string input into list of strings
if isinstance(authors, str):
authors = authors.split(', ')
if isinstance(funding, str):
funding = funding.split(', ')
if isinstance(references_and_links, str):
references_and_links = references_and_links.split(', ')
fname = op.join(path, 'dataset_description.json')
description = OrderedDict([('Name', name),
('BIDSVersion', BIDS_VERSION),
('License', data_license),
('Authors', authors),
('Acknowledgements', acknowledgements),
('HowToAcknowledge', how_to_acknowledge),
('Funding', funding),
('ReferencesAndLinks', references_and_links),
('DatasetDOI', doi)])
pop_keys = [key for key, val in description.items() if val is None]
for key in pop_keys:
description.pop(key)
_write_json(fname, description, overwrite=True, verbose=verbose)
def write_raw_bids(raw, bids_basename, output_path, events_data=None,
event_id=None, anonymize=None, convert=None,
overwrite=False, verbose=True):
"""Walk over a folder of files and create BIDS compatible folder.
.. warning:: The original files are simply copied over if the original
file format is BIDS-supported for that modality. Otherwise,
this function will convert to a BIDS-supported file format
while warning the user. For EEG and iEEG data, conversion will
be to BrainVision format, for MEG conversion will be to FIF.
Parameters
----------
raw : instance of mne.io.Raw
The raw data. It must be an instance of mne.Raw. The data should not be
loaded on disk, i.e., raw.preload must be False.
bids_basename : str
The base filename of the BIDS compatible files. Typically, this can be
generated using make_bids_basename.
Example: `sub-01_ses-01_task-testing_acq-01_run-01`.
This will write the following files in the correct subfolder of the
output_path::
sub-01_ses-01_task-testing_acq-01_run-01_meg.fif
sub-01_ses-01_task-testing_acq-01_run-01_meg.json
sub-01_ses-01_task-testing_acq-01_run-01_channels.tsv
sub-01_ses-01_task-testing_acq-01_run-01_coordsystem.json
and the following one if events_data is not None::
sub-01_ses-01_task-testing_acq-01_run-01_events.tsv
and add a line to the following files::
participants.tsv
scans.tsv
Note that the modality 'meg' is automatically inferred from the raw
object and extension '.fif' is copied from raw.filenames.
output_path : str
The path of the root of the BIDS compatible folder. The session and
subject specific folders will be populated automatically by parsing
bids_basename.
events_data : str | array | None
The events file. If a string, a path to the events file. If an array,
the MNE events array (shape n_events, 3). If None, events will be
inferred from the stim channel using `mne.find_events`.
event_id : dict | None
The event id dict used to create a 'trial_type' column in events.tsv
anonymize : dict | None
If a dictionary is passed, data will be anonymized; identifying data
structures such as study date and time will be changed.
`daysback` is a required argument and `keep_his` is optional, these
arguments are passed to :func:`mne.io.anonymize_info`.
`daysback` : int
Number of days to move back the date. To keep relative dates
for a subject use the same `daysback`. According to BIDS
specificiations, the number of days back must be great enough
that the date is before 1925 thus `daysback` will be subtracted
from Dec 31, 1924.
`keep_his` : bool
If True his_id of subject_info will NOT be overwritten.
Defaults to False.
convert : bool | None
Whether or not to save in BrainVision for EEG or iEEG data and FIF
for MEG data if the data file is not already in that format.
Defaults to None in which case conversion will be done only if
the file type is not compatible (e.g. an EEG only FIF file). If
anonymize is used, the file must be converted.
overwrite : bool
Whether to overwrite existing files or data in files.
Defaults to False.
If overwrite is True, any existing files with the same BIDS parameters
will be overwritten with the exception of the `participants.tsv` and
`scans.tsv` files. For these files, parts of pre-existing data that
match the current data will be replaced.
If overwrite is False, no existing data will be overwritten or
replaced.
verbose : bool
If verbose is True, this will print a snippet of the sidecar files. If
False, no content will be printed.
Returns
-------
output_path : str
The path of the root of the BIDS compatible folder.
Notes
-----
For the participants.tsv file, the raw.info['subjects_info'] should be
updated and raw.info['meas_date'] should not be None to compute the age
of the participant correctly.
"""
if not check_version('mne', '0.17'):
raise ValueError('Your version of MNE is too old. '
'Please update to 0.17 or newer.')
if not isinstance(raw, BaseRaw):
raise ValueError('raw_file must be an instance of BaseRaw, '
'got %s' % type(raw))
if not hasattr(raw, 'filenames') or raw.filenames[0] is None:
raise ValueError('raw.filenames is missing. Please set raw.filenames'
'as a list with the full path of original raw file.')
if raw.preload is not False:
raise ValueError('The data should not be preloaded.')
raw = raw.copy()
raw_fname = raw.filenames[0]
if '.ds' in op.dirname(raw.filenames[0]):
raw_fname = op.dirname(raw.filenames[0])
# point to file containing header info for multifile systems
raw_fname = raw_fname.replace('.eeg', '.vhdr')
raw_fname = raw_fname.replace('.fdt', '.set')
_, ext = _parse_ext(raw_fname, verbose=verbose)
raw_orig = reader[ext](**raw._init_kwargs)
assert_array_equal(raw.times, raw_orig.times,
"raw.times should not have changed since reading"
" in from the file. It may have been cropped.")
params = _parse_bids_filename(bids_basename, verbose)
subject_id, session_id = params['sub'], params['ses']
acquisition, task, run = params['acq'], params['task'], params['run']
kind = _handle_kind(raw)
bids_fname = bids_basename + '_%s%s' % (kind, ext)
# check whether the info provided indicates that the data is emptyroom
# data
emptyroom = False
if subject_id == 'emptyroom' and task == 'noise':
emptyroom = True
# check the session date provided is consistent with the value in raw
meas_date = raw.info.get('meas_date', None)
if meas_date is not None:
er_date = datetime.fromtimestamp(
raw.info['meas_date'][0]).strftime('%Y%m%d')
if er_date != session_id:
raise ValueError("Date provided for session doesn't match "
"session date.")
data_path = make_bids_folders(subject=subject_id, session=session_id,
kind=kind, output_path=output_path,
overwrite=False, verbose=verbose)
if session_id is None:
ses_path = os.sep.join(data_path.split(os.sep)[:-1])
else:
ses_path = make_bids_folders(subject=subject_id, session=session_id,
output_path=output_path, make_dir=False,
overwrite=False, verbose=verbose)
# create filenames
scans_fname = make_bids_basename(
subject=subject_id, session=session_id, suffix='scans.tsv',
prefix=ses_path)
participants_tsv_fname = make_bids_basename(prefix=output_path,
suffix='participants.tsv')
participants_json_fname = make_bids_basename(prefix=output_path,
suffix='participants.json')
coordsystem_fname = make_bids_basename(
subject=subject_id, session=session_id, acquisition=acquisition,
suffix='coordsystem.json', prefix=data_path)
sidecar_fname = make_bids_basename(
subject=subject_id, session=session_id, task=task, run=run,
acquisition=acquisition, suffix='%s.json' % kind, prefix=data_path)
events_fname = make_bids_basename(
subject=subject_id, session=session_id, task=task,
acquisition=acquisition, run=run, suffix='events.tsv',
prefix=data_path)
channels_fname = make_bids_basename(
subject=subject_id, session=session_id, task=task, run=run,
acquisition=acquisition, suffix='channels.tsv', prefix=data_path)
if ext not in ['.fif', '.ds', '.vhdr', '.edf', '.bdf', '.set', '.con',
'.sqd']:
bids_raw_folder = bids_fname.split('.')[0]
bids_fname = op.join(bids_raw_folder, bids_fname)
# Anonymize
if anonymize:
if 'daysback' not in anonymize:
raise ValueError('`daysback` argument required to anonymize.')
daysback = anonymize['daysback']
if daysback < 0:
raise ValueError('`daysback` must be a positive number')
# this is for ctf and set which have meas_date = None in testing data
if raw.info['meas_date'] is None:
raw.info['meas_date'] = (np.int32(0), np.int32(0))
min_secondsback = (_stamp_to_dt(raw.info['meas_date']).date() -
date(year=1924, month=12, day=31)).total_seconds()
# 86400 == seconds in a day
min_daysback = np.ceil(min_secondsback / 86400)
daysback += min_daysback
new_date = (_stamp_to_dt(raw.info['meas_date']).date() -
timedelta(days=daysback))
seconds_from_0 = (datetime.fromtimestamp(0).date() - new_date
).total_seconds()
if abs(seconds_from_0) > np.iinfo('>i4').max:
max_val = np.ceil((int(raw.info['meas_date'][0]) -
np.iinfo('>i4').min) / 86400) - min_daysback
raise ValueError('`daysback` is too large, maximum value: %i' %
max_val)
keep_his = anonymize['keep_his'] if 'keep_his' in anonymize else False
raw.info = anonymize_info(raw.info, daysback=daysback,
keep_his=keep_his)
if kind == 'meg' and ext != 'fif':
if verbose:
warn('Converting to FIF for anonymization')
bids_fname = bids_fname.replace(ext, '.fif')
# Read in Raw object and extract metadata from Raw object if needed
orient = ORIENTATION.get(ext, 'n/a')
unit = UNITS.get(ext, 'n/a')
manufacturer = MANUFACTURERS.get(ext, 'n/a')
# save all meta data
_participants_tsv(raw, subject_id, participants_tsv_fname, overwrite,
verbose)
_participants_json(participants_json_fname, True, verbose)
_scans_tsv(raw, op.join(kind, bids_fname), scans_fname, overwrite, verbose)
# TODO: Implement coordystem.json and electrodes.tsv for EEG and iEEG
if kind == 'meg' and not emptyroom:
_coordsystem_json(raw, unit, orient, manufacturer, coordsystem_fname,
overwrite, verbose)
events, event_id = _read_events(events_data, event_id, raw, ext)
if events is not None and len(events) > 0 and not emptyroom:
_events_tsv(events, raw, events_fname, event_id, overwrite, verbose)
make_dataset_description(output_path, name=" ", verbose=verbose)
_sidecar_json(raw, task, manufacturer, sidecar_fname, kind, overwrite,
verbose)
_channels_tsv(raw, channels_fname, overwrite, verbose)
# set the raw file name to now be the absolute path to ensure the files
# are placed in the right location
bids_fname = op.join(data_path, bids_fname)
if os.path.exists(bids_fname) and not overwrite:
raise FileExistsError('"%s" already exists. Please set ' # noqa: F821
'overwrite to True.' % bids_fname)
_mkdir_p(os.path.dirname(bids_fname))
if convert is None:
convert = ext not in ALLOWED_EXTENSIONS[kind]
if not (convert or anonymize) and verbose:
print('Copying data files to %s' % op.splitext(bids_fname)[0])
# File saving branching logic
if convert or anonymize:
if kind == 'meg':
if ext == '.pdf':
bids_fname = op.join(data_path, op.basename(bids_fname))
if anonymize:
# delete to avoid 32-bit errors
# https://github.com/bids-standard/bids-specification/issues/360
raw.info['meas_date'] = (np.int32(0), np.int32(0))
_write_raw_fif(raw, bids_fname)
else:
if verbose:
warn('Converting data files to BrainVision format')
_write_raw_brainvision(raw, bids_fname)
elif ext == '.fif':
_write_raw_fif(raw, bids_fname)
# CTF data is saved and renamed in a directory
elif ext == '.ds':
copyfile_ctf(raw_fname, bids_fname)
# BrainVision is multifile, copy over all of them and fix pointers
elif ext == '.vhdr':
copyfile_brainvision(raw_fname, bids_fname)
# EEGLAB .set might be accompanied by a .fdt - find out and copy it too
elif ext == '.set':
copyfile_eeglab(raw_fname, bids_fname)
elif ext == '.pdf':
copyfile_bti(raw_orig, op.join(data_path, bids_raw_folder))
else:
sh.copyfile(raw_fname, bids_fname)
# KIT data requires the marker file to be copied over too
if 'mrk' in raw._init_kwargs:
hpi = raw._init_kwargs['mrk']
acq_map = dict()
if isinstance(hpi, list):
if _get_mrk_meas_date(hpi[0]) > _get_mrk_meas_date(hpi[1]):
raise ValueError('Markers provided in incorrect order.')
_, marker_ext = _parse_ext(hpi[0])
acq_map = dict(zip(['pre', 'post'], hpi))
else:
_, marker_ext = _parse_ext(hpi)
acq_map[None] = hpi
for key, value in acq_map.items():
marker_fname = make_bids_basename(
subject=subject_id, session=session_id, task=task, run=run,
acquisition=key, suffix='markers%s' % marker_ext,
prefix=data_path)
sh.copyfile(value, marker_fname)
return output_path
def write_anat(bids_root, subject, t1w, session=None, acquisition=None,
raw=None, trans=None, deface=False, overwrite=False,
verbose=False):
"""Put anatomical MRI data into a BIDS format.
Given a BIDS directory and a T1 weighted MRI scan for a certain subject,
format the MRI scan to be in BIDS format and put it into the correct
location in the bids_dir. If a transformation matrix is supplied, a
sidecar JSON file will be written for the T1 weighted data.
Parameters
----------
bids_root : str
Path to root of the BIDS folder
subject : str
Subject label as in 'sub-<label>', for example: '01'
t1w : str | nibabel image object
Path to a T1 weighted MRI scan of the subject. Can be in any format
readable by nibabel. Can also be a nibabel image object of a T1
weighted MRI scan. Will be written as a .nii.gz file.
session : str | None
The session for `t1w`. Corresponds to "ses"
acquisition: str | None
The acquisition parameters for `t1w`. Corresponds to "acq"
raw : instance of Raw | None
The raw data of `subject` corresponding to `t1w`. If `raw` is None,
`trans` has to be None as well
trans : instance of mne.transforms.Transform | str | None
The transformation matrix from head coordinates to MRI coordinates. Can
also be a string pointing to a .trans file containing the
transformation matrix. If None, no sidecar JSON file will be written
for `t1w`
deface : bool | dict
If False, no defacing is performed.
If True, deface with default parameters.
`trans` and `raw` must not be `None` if True.
If dict, accepts the following keys:
`inset`: how far back in millimeters to start defacing
relative to the nasion (default 20)
`theta`: is the angle of the defacing shear in degrees relative
to the normal to the plane passing through the anatomical
landmarks (default 35).
overwrite : bool
Whether to overwrite existing files or data in files.
Defaults to False.
If overwrite is True, any existing files with the same BIDS parameters
will be overwritten with the exception of the `participants.tsv` and
`scans.tsv` files. For these files, parts of pre-existing data that
match the current data will be replaced.
If overwrite is False, no existing data will be overwritten or
replaced.
verbose : bool
If verbose is True, this will print a snippet of the sidecar files. If
False, no content will be printed.
Returns
-------
anat_dir : str
Path to the anatomical scan in the `bids_dir`
"""
if not has_nibabel(): # pragma: no cover
raise ImportError('This function requires nibabel.')
import nibabel as nib
if deface and (trans is None or raw is None):
raise ValueError('The raw object, trans and raw must be provided to '
'deface the T1')
# Make directory for anatomical data
anat_dir = op.join(bids_root, 'sub-{}'.format(subject))
# Session is optional
if session is not None:
anat_dir = op.join(anat_dir, 'ses-{}'.format(session))
anat_dir = op.join(anat_dir, 'anat')
if not op.exists(anat_dir):
os.makedirs(anat_dir)
# Try to read our T1 file and convert to MGH representation
if isinstance(t1w, str):
t1w = nib.load(t1w)
elif type(t1w) not in nib.all_image_classes:
raise ValueError('`t1w` must be a path to a T1 weighted MRI data file '
', or a nibabel image object, but it is of type '
'"{}"'.format(type(t1w)))
t1w = nib.Nifti1Image(t1w.dataobj, t1w.affine)
# XYZT_UNITS = NIFT_UNITS_MM (10 in binary or 2 in decimal)
# seems to be the default for Nifti files
# https://nifti.nimh.nih.gov/nifti-1/documentation/nifti1fields/nifti1fields_pages/xyzt_units.html
if t1w.header['xyzt_units'] == 0:
t1w.header['xyzt_units'] = np.array(10, dtype='uint8')
# Now give the NIfTI file a BIDS name and write it to the BIDS location
t1w_basename = make_bids_basename(subject=subject, session=session,
acquisition=acquisition, prefix=anat_dir,
suffix='T1w.nii.gz')
# Check if we have necessary conditions for writing a sidecar JSON
if trans is not None:
# get trans and ensure it is from head to MRI
trans, _ = _get_trans(trans, fro='head', to='mri')
if not isinstance(raw, BaseRaw):
raise ValueError('`raw` must be specified if `trans` is not None')
# Prepare to write the sidecar JSON
# extract MEG landmarks
coords_dict = _extract_landmarks(raw.info['dig'])
meg_landmarks = np.asarray((coords_dict['LPA'],
coords_dict['NAS'],
coords_dict['RPA']))
# Transform MEG landmarks into MRI space, adjust units by * 1e3
mri_landmarks = apply_trans(trans, meg_landmarks, move=True) * 1e3
# Get landmarks in voxel space, using the mgh version of our T1 data
t1_mgh = nib.MGHImage(t1w.dataobj, t1w.affine)
vox2ras_tkr = t1_mgh.header.get_vox2ras_tkr()
ras2vox_tkr = np.linalg.inv(vox2ras_tkr)
mri_landmarks = apply_trans(ras2vox_tkr, mri_landmarks) # in vox
# Write sidecar.json
t1w_json = dict()
t1w_json['AnatomicalLandmarkCoordinates'] = \
{'LPA': list(mri_landmarks[0, :]),
'NAS': list(mri_landmarks[1, :]),
'RPA': list(mri_landmarks[2, :])}
fname = t1w_basename.replace('.nii.gz', '.json')
if op.isfile(fname) and not overwrite:
raise IOError('Wanted to write a file but it already exists and '
'`overwrite` is set to False. File: "{}"'
.format(fname))
_write_json(fname, t1w_json, overwrite, verbose)
if deface:
t1w = _deface(t1w, mri_landmarks, deface, trans, raw)
# Save anatomical data
if op.exists(t1w_basename):
if overwrite:
os.remove(t1w_basename)
else:
raise IOError('Wanted to write a file but it already exists and '
'`overwrite` is set to False. File: "{}"'
.format(t1w_basename))
nib.save(t1w, t1w_basename)
return anat_dir
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment