Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
MRtrix3 stand-alone Python script for generating connectivity fingerprints within a ROI
#!/usr/bin/env python
# NOTE: It is recommended to set the MRtrix config file entry TckgenEarlyExit to true
# before running this script: This should prevent the script from hanging on
# voxels where it is too difficult to generate streamlines.
# Typical pre-processing required before running this script:
# 1. FOD: Standard DWI pre-processing
# 2. 5TT: e.g. 5ttgen fsl
# 3. Parcellation:
# 3.1. Run FreeSurfer
# 3.2. labelconvert - Map large indices provided in FreeSurfer image output to indices that increase from 1
# 3.3. labelsgmfix - To improve parcellation of sub-cortical grey matter structures
# 4. Mask:
# 4.1. FSL FIRST (run_first_all) - segment structure(s) of interest only
# 4.2. meshconvert - Convert .vtk mesh file from FIRST convention to realspace coordinates
# 4.2. mesh2pve - Partial volume fractions for each voxel in a template image that lie within the mesh
# 4.3. mrthreshold -abs 0.5 - Convert partial volume fraction image to a mask
import math, os
from mrtrix3 import app, image, path, run
app.init('Robert E. Smith (robert.smith@florey.edu.au)',
'Parcellate a structure based on the projection of streamlines to labelled nodes')
app.cmdline.add_argument('input_fod', help='The input FODs')
app.cmdline.add_argument('input_5tt', help='The input 5TT image for ACT')
app.cmdline.add_argument('input_parc', help='The input parcellation image')
app.cmdline.add_argument('input_mask', help='The mask of the structure to be parcellated')
app.cmdline.add_argument('output_voxels', help='The list of voxels for which fingerprints were successfully generated')
app.cmdline.add_argument('output_data', help='The connectivity fingerprint of each successfully-processed voxel')
options = app.cmdline.add_argument_group('Options for the maskcbp script')
options.add_argument('-tckgen_options', help='Options to pass to the tckgen command (remember to wrap in quotation marks)')
options.add_argument('-grad_image', help='Calculate an image representing gradients in connectivity fingerprints')
app.parse()
# Commented since test data was generated using an older 5ttge fsl script that had a small error
run.command('5ttcheck ' + path.fromUser(app.args.input_5tt, True))
if len(image.Header(path.fromUser(app.args.input_fod, False)).size()) != 4:
app.error('Input FOD image must be a 4D image')
app.checkOutputPath(path.fromUser(app.args.output_voxels, False))
app.checkOutputPath(path.fromUser(app.args.output_data, False))
if app.args.grad_image:
app.checkOutputPath(path.fromUser(app.args.grad_image, False))
tckgen_options = '-backtrack -crop_at_gmwmi -select 5k -seeds 10M'
if app.args.tckgen_options:
tckgen_options = app.args.tckgen_options
# Need to extract the target number of streamlines from tckgen_options
# Beware that this may include a postfix multiplier
tckgen_options_split = tckgen_options.split(' ')
if not '-select' in tckgen_options_split:
app.error('Contents of -tckgen_options must at least set the -select option')
target_count = tckgen_options_split[tckgen_options_split.index('-select')+1]
if target_count[-1].isalpha():
multiplier = 1
if target_count[-1].lower() == 'k':
multiplier = 1000
elif target_count[-1].lower() == 'm':
multiplier = 1000000
elif target_count[-1].lower() == 'b':
multiplier = 1000000000
else:
app.error('Could not convert -select field \'' + target_count + '\' to an integer')
target_count = int(target_count[:-1]) * multiplier
else:
target_count = int(target_count)
app.makeTempDir()
# Only the mask image is copied to the temporary directory for convenience; the others can be used in-place
run.command('mrconvert ' + path.fromUser(app.args.input_mask, True) + ' ' + path.toTemp('mask.mif', True) + ' -datatype bit')
app.gotoTempDir()
if len(image.Header('mask.mif').size()) != 3:
app.error('Input mask image must be a 3D image')
run.command('maskdump mask.mif input_voxels.txt')
input_voxels = [ _.split() for _ in open('input_voxels.txt', 'r').read().splitlines() if _ ]
# Images of:
# - The number of streamline attempts from each voxel
# - The number of streamlines generated from each voxel
# - The number of streamlines successfully reaching a target node from each voxel
# This now needs to be done at the tracking step instead:
# some voxels may abort early
run.command('mrthreshold mask.mif empty_mask.mif -abs 1.5')
run.command('mrconvert empty_mask.mif num_attempts.mif -datatype uint32')
run.command('mrconvert empty_mask.mif num_streamlines.mif -datatype uint32')
run.command('mrconvert empty_mask.mif num_assigned.mif -datatype uint32')
def trackCounts(filepath):
count = None
total_count = None
tckinfo_output = run.command('tckinfo ' + filepath)[0]
for line in tckinfo_output.splitlines():
key_value = [ entry.strip() for entry in line.split(':') ]
if len(key_value) != 2:
continue
if key_value[0] == 'count':
count = int(key_value[1])
elif key_value[0] == 'total_count':
total_count = int(key_value[1])
return (count, total_count)
progress = app.progressBar('Tracking: 0 of ' + str(len(input_voxels)) + ' voxels processed', len(input_voxels))
voxel_counter = 0
failure_counter = 0
output_voxels = [ ]
output_data = [ ]
for v in input_voxels:
seed_path = 'seed_' + v[0] + '_' + v[1] + '_' + v[2] + '.mif'
tracks_path = 'tracks_' + v[0] + '_' + v[1] + '_' + v[2] + '.tck'
connectome_path = 'connectome_' + v[0] + '_' + v[1] + '_' + v[2] + '.csv'
run.command('mrconvert mask.mif ' + seed_path + ' -coord 0 ' + v[0] + ' -coord 1 ' + v[1] + ' -coord 2 ' + v[2])
run.command('tckgen ' + path.fromUser(app.args.input_fod, True) + ' ' + tracks_path + ' -act ' + path.fromUser(app.args.input_5tt, True) + ' -seed_image ' + seed_path + ' -seed_unidirectional ' + tckgen_options)
# Capture the number of tracks that needed to be generated;
# see if there's any useful contrast
# (actually, get the ratio of generated vs. accepted)
# Will lose this ability if changing to a fixed-number-of-streamlines-per-voxel seeding mechanism...
# Reject voxel if the requested number of streamlines was not generated
if os.path.isfile(tracks_path):
counts = trackCounts(tracks_path)
run.command('mredit num_streamlines.mif -voxel ' + ','.join(v) + ' ' + str(counts[0]))
run.command('mredit num_attempts.mif -voxel ' + ','.join(v) + ' ' + str(counts[1]))
if counts[0] == target_count:
run.command('tck2connectome ' + tracks_path + ' ' + path.fromUser(app.args.input_parc, True) + ' ' + connectome_path + ' -vector -keep_unassigned')
with open(connectome_path, 'r') as f:
connectome = [ int(i) for i in f.read().split() ]
run.command('mredit num_assigned.mif -voxel ' + ','.join(v) + ' ' + str(sum(connectome[1:])))
output_voxels.append(v)
output_data.append(connectome)
else:
failure_counter += 1
app.debug('Tracking aborted for voxel ' + ','.join(v) + ' (' + str(counts[0]) + ' of ' + str(counts[1]) + ' streamlines accepted)')
# Never keep, even with -nocleanup
# Do however need to check their existence, in case we're using -continue
if os.path.isfile(seed_path):
os.remove(seed_path)
if os.path.isfile(tracks_path):
os.remove(tracks_path)
voxel_counter += 1
progress.increment('Tracking: ' + str(voxel_counter) + ' of ' + str(len(input_voxels)) + ' voxels processed')
progress.done()
if failure_counter:
app.warn(str(failure_counter) + ' of ' + str(len(input_voxels)) + ' voxels not successfully tracked')
with open(path.fromUser(app.args.output_voxels, False), 'w') as f:
for v in output_voxels:
f.write(','.join(v) + '\n')
with open(path.fromUser(app.args.output_data, False), 'w') as f:
for d in output_data:
f.write(','.join([str(i) for i in d]) + '\n')
if not app.args.grad_image:
app.complete()
exit(0)
# Rather than going straight to clustering, generate an image of 'connectivity gradient'
# Difference in connectivity profiles between adjacent voxels in 3 dimensions
# Get the voxel sizes of the mask image; these must be used to scale the gradient calculations
spacings = image.Header('mask.mif').spacing()
# Construct the list of possible voxel offsets, remembering that the opposite offset will also be tested
offsets = [ [0,0,1], [0,1,0], [1,0,0], [0,1,1], [1,0,1], [1, 1, 0], [0,1,-1], [1,0,-1], [1,-1,0], [1,1,1], [1,1,-1], [1,-1,1], [-1,1,1] ]
offset_length_multipliers = [ (1.0/length) for length in [ math.sqrt(float(o[0]*o[0]+o[1]*o[1]+o[2]*o[2])) for o in offsets ] ]
unit_offsets = [ [ float(f)*m for f in o ] for o, m in zip(offsets, offset_length_multipliers) ]
# Want to know the length in mm of each of these offsets, such that the output gradient is in units of (CosSim.mm^-1)
scale_factors = [ (1.0/length) for length in [ math.sqrt(math.pow(spacings[0]*o[0],2.0)+math.pow(spacings[1]*o[1],2.0)+math.pow(spacings[2]*o[2],2.0)) for o in offsets ] ]
# First, generate a bogus file here: That will allow the -continue option to be used
# Actually, instead, delay creation of the output image to this point
# Output image must now be 4D with 3 volumes
# Scratch that: _13_ volumes
# Need to embed the gradient directions within the image header
# Write them to a file also
direction_string = ''
with open('directions.txt', 'w') as f:
for direction in unit_offsets:
text = ','.join(str(d) for d in direction) + '\n'
f.write (text)
direction_string += text
direction_string = direction_string[:-1] # Remove the trailing newline character
run.command('mrcat ' + ' '.join( ['empty_mask.mif']*13) + ' -axis 3 - | mrconvert - result.mif -stride 0,0,0,1 -datatype float32 -set_property directions \"' + direction_string + '\"')
# Store all connectivity vectors in memory
# Load them here rather than as they are generated so that the RAM isn't used up unnecessarily -
# also just to make sure that they're loaded correctly if -continue is used
connectomes = { }
progress = app.progressBar('Loading connectomes into memory', len(input_voxels))
for v in input_voxels:
connectome_path = 'connectome_' + v[0] + '_' + v[1] + '_' + v[2] + '.csv'
if os.path.exists(connectome_path):
with open(connectome_path, 'r') as f:
connectome = [ int(i) for i in f.read().split() ]
connectomes[tuple(v)] = connectome
progress.increment()
progress.done()
# TODO Replace with Gini coefficient?
def CosSim(one, two): # Cosine Similarity
if not isinstance(one, list) or not isinstance(two, list):
app.error('Internal error: CosSim() function intended to work on lists')
if not isinstance(one[0], int) or not isinstance(two[0], int):
app.error('Internal error: CosSim() function intended to work on lists of integers')
if len(one) != len(two):
app.error('Internal error: Mismatched connectome vector lengths')
# The following check shouldn't really be required anymore, since voxels in which tracking
# fails are omitted from the analysis, but let's leave it here anyway
one_norm = math.sqrt(float(sum(i*i for i in one[1:])))
two_norm = math.sqrt(float(sum(i*i for i in two[1:])))
if not one_norm or not two_norm:
return 1.0
one_normalised = [ float(j) / one_norm for j in one[1:] ]
two_normalised = [ float(j) / two_norm for j in two[1:] ]
return sum( [ one_normalised[_] * two_normalised[_] for _ in range(0, len(one_normalised)) ] )
# Is it possible to get something that's directional?
# Maybe, rather than just the three axes, test diagonals as well
# Total of 13 directions, so can't get an lmax=4 fit, but can use dixel overlay plot, and
# rotate directions based on rigid transformation
# - Could potentially get lmax=4 with spatial regularisation...
# Maybe then do an lmax=2 fit and get peak direction / components in scanner XYZ?
# Also: Scale appropriately depending on (anisotropic) voxel size
# TODO This will run exceptionally slowly if the target is on a network file system
# Alternatively, could concatenate the whole lot into a single mredit call?
# For each voxel, calculate 'gradient' in connectivity profile in 13 directions
# One or both of the adjacent voxels in any particular direction may be absent from the mask - handle appropriately
progress = app.progressBar('Gradient calculation: 0 of ' + str(len(input_voxels)) + ' voxels processed', len(input_voxels))
voxel_counter = 0
for v in input_voxels:
if tuple(v) in connectomes: # Some voxels may have failed tracking
# TODO Could at least concatenate the 13 values for a single voxel into a single mredit call
for index, (offset, scale_factor) in enumerate(zip(offsets, scale_factors)):
vneg = [ str(int(v[axis]) - offset[axis]) for axis in range(0,3) ]
vpos = [ str(int(v[axis]) + offset[axis]) for axis in range(0,3) ]
grad = 0.0
if vneg in input_voxels:
grad += (1.0 - CosSim(connectomes[tuple(v)], connectomes[tuple(vneg)])) * scale_factor
if vpos in input_voxels:
grad += (1.0 - CosSim(connectomes[tuple(v)], connectomes[tuple(vpos)])) * scale_factor
run.command('mredit result.mif -voxel ' + ','.join(v) + ',' + str(index) + ' ' + str(grad))
voxel_counter += 1
progress.increment('Gradient calculation: ' + str(voxel_counter) + ' of ' + str(len(input_voxels)) + ' voxels processed')
progress.done()
run.command('mrconvert result.mif ' + path.fromUser(app.args.grad_image, True) + (' -force' if app.args.force else ''))
app.complete()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.