Created
April 16, 2020 14:44
-
-
Save christian-oreilly/f0c3e262343ecd580d92f4c8244cc87f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from nibabel.freesurfer.io import read_geometry, write_geometry | |
import numpy as np | |
from pathlib import Path | |
import nibabel as nib | |
import sys | |
from scipy import spatial | |
import pandas as pd | |
from tqdm.notebook import tqdm | |
from mne.bem import make_watershed_bem | |
import signal | |
import subprocess as sp | |
class VerboseCalledProcessError(sp.CalledProcessError): | |
def __str__(self): | |
if self.returncode and self.returncode < 0: | |
try: | |
msg = "Command '%s' died with %r." % ( | |
self.cmd, signal.Signals(-self.returncode)) | |
except ValueError: | |
msg = "Command '%s' died with unknown signal %d." % ( | |
self.cmd, -self.returncode) | |
else: | |
msg = "Command '%s' returned non-zero exit status %d." % ( | |
self.cmd, self.returncode) | |
return f'{msg}\n' \ | |
f'Stdout:\n' \ | |
f'{self.output}\n' \ | |
f'Stderr:\n' \ | |
f'{self.stderr}' | |
def bash(cmd, print_stdout=True, print_stderr=True): | |
proc = sp.Popen(cmd, stderr=sp.PIPE, stdout=sp.PIPE) | |
#, shell=True, universal_newlines=True, | |
#executable='/bin/bash') | |
all_stdout = [] | |
all_stderr = [] | |
while proc.poll() is None: | |
for stdout_line in proc.stdout: | |
if stdout_line != '': | |
if print_stdout: | |
print(stdout_line, end='') | |
all_stdout.append(stdout_line) | |
for stderr_line in proc.stderr: | |
if stderr_line != '': | |
if print_stderr: | |
print(stderr_line, end='', file=sys.stderr) | |
all_stderr.append(stderr_line) | |
stdout_text = ''.join([x.decode() for x in all_stdout]) | |
stderr_text = ''.join([x.decode() for x in all_stderr] ) | |
if proc.wait() != 0: | |
raise VerboseCalledProcessError(proc.returncode, cmd, stdout_text, stderr_text) | |
root = Path("/media/christian/ElementsSE/lewis/CIVET-2.1.0/") | |
fs_subject_dir = Path("/usr/local/freesurfer/subjects/") | |
# Extracting BEM surfaces using the wathershed algorithm | |
for file in tqdm(list(root.glob("*"))): | |
subject = file.name | |
file_name_in = "{}_t1_final.mnc".format(subject) | |
path_in = root / subject / "final" / file_name_in | |
img = nib.load(str(path_in)) | |
path_out = fs_subject_dir / subject / "mri" / "T1.mgz" | |
path_out.parent.mkdir(parents=True, exist_ok=True) | |
data = np.asanyarray(img.dataobj, dtype=np.float32) | |
img = nib.MGHImage(data, affine=img.affine) | |
nib.save(img, str(path_out)) | |
for file in tqdm(list(root.glob("*"))): | |
subject = file.name | |
make_watershed_bem(subject, subjects_dir=None, overwrite=True, volume='T1', atlas=False, | |
gcaatlas=False, preflood=None, show=False, copy=True, | |
T1=None, brainmask='ws', verbose=None) | |
for file in tqdm(list(root.glob("*"))): | |
subject = file.name | |
for surface in ["inner_skull", "outer_skull", "outer_skin"]: | |
path_in = fs_subject_dir / subject / "bem" / "watershed" / "{}_{}_surface".format(subject, surface) | |
path_out = fs_subject_dir / subject / "bem" / "{}.surf".format(surface) | |
vertices, faces, volume_info = read_geometry(str(path_in), read_metadata=True) | |
vert = vertices #+ volume_info["cras"] | |
write_geometry(str(path_out), | |
np.array([-vert[:, 2], vert[:, 0], -vert[:, 1]]).T, | |
faces, volume_info=volume_info) | |
# Correction for wrong bem on the lower surface due to the bounding box around the brain on the MRI being too small | |
for file in tqdm(list(root.glob("*"))): | |
subject = file.name | |
vertices = {} | |
head_surfaces = ["inner_skull", "outer_skull", "outer_skin"] | |
for surface in head_surfaces: | |
path_in = fs_subject_dir / subject / "bem" / "{}.surf".format(surface) | |
vertices[surface], faces, volume_info = read_geometry(str(path_in), read_metadata=True) | |
th = {} | |
th["outer_skin"] = np.percentile(vertices["outer_skin"][:, 1], 98)-20 | |
th["outer_skull"] = th["outer_skin"]-10 | |
th["inner_skull"] = th["outer_skin"]-40 | |
for inner_surface, outer_surface in zip(head_surfaces[:2], head_surfaces[1:]): | |
df = {} | |
for surface in [inner_surface, outer_surface]: | |
cond = vertices[surface][:, 0] > th[surface] | |
df[surface] = pd.DataFrame({surface: vertices[surface][:, 0], | |
"y": np.round(vertices[surface][:, 1]), | |
"z": np.round(vertices[surface][:, 2]), | |
"cond": cond, | |
"no_vert": np.arange(len(vertices[surface]), dtype=int) | |
}) | |
#df[surface]["angles"] = np.round(np.arctan2(df[surface]["y"], df[surface]["z"])*20)/20.0 | |
#df[surface]["r"] = np.sqrt(df[surface]["y"]**2 + df[surface]["z"]**2) | |
#rmax_df = df[surface].loc[cond, ["angles", "r"]].groupby("angles").max().reset_index().rename(columns={"r":"rmax"}) | |
#df[surface] = df[surface].merge(rmax_df, on="angles") | |
#df[surface]["cond2"] = df[surface]["cond"] | |
#df[surface].loc[df[surface]["r"]/df[surface]["rmax"] > 0.95, "cond2"] = False | |
x, y, z = zip(*df[inner_surface].loc[df[inner_surface]["cond"], [inner_surface, "y", "z"]].values) | |
tree = spatial.KDTree(list(zip(y, z))) | |
tree.data_lut = {(y, z):x for x, y, z in zip(x, y, z)} | |
for y, z, x, no_vert in df[outer_surface].loc[df[outer_surface]["cond"], | |
["y", "z", outer_surface, "no_vert"]].values: | |
inds = tree.query_ball_point([y, z], r=2) | |
if len(inds) == 0: | |
dist, ind = tree.query([y, z]) | |
if dist < 10: | |
inds = [ind] | |
if len(inds): | |
max_x = np.max([tree.data_lut[tuple(tree.data[ind])] for ind in inds]) | |
if max_x+2 > x: # need correction? | |
vertices[outer_surface][int(no_vert), 0] = max_x+2 | |
for surface in head_surfaces: | |
path_out = fs_subject_dir / subject / "bem" / "{}.surf".format(surface) | |
_, faces, volume_info = read_geometry(str(path_in), read_metadata=True) | |
write_geometry(str(path_out), vertices[surface], | |
faces, volume_info=volume_info) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment