Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save christian-oreilly/f0c3e262343ecd580d92f4c8244cc87f to your computer and use it in GitHub Desktop.
Save christian-oreilly/f0c3e262343ecd580d92f4c8244cc87f to your computer and use it in GitHub Desktop.
from nibabel.freesurfer.io import read_geometry, write_geometry
import numpy as np
from pathlib import Path
import nibabel as nib
import sys
from scipy import spatial
import pandas as pd
from tqdm.notebook import tqdm
from mne.bem import make_watershed_bem
import signal
import subprocess as sp
class VerboseCalledProcessError(sp.CalledProcessError):
def __str__(self):
if self.returncode and self.returncode < 0:
try:
msg = "Command '%s' died with %r." % (
self.cmd, signal.Signals(-self.returncode))
except ValueError:
msg = "Command '%s' died with unknown signal %d." % (
self.cmd, -self.returncode)
else:
msg = "Command '%s' returned non-zero exit status %d." % (
self.cmd, self.returncode)
return f'{msg}\n' \
f'Stdout:\n' \
f'{self.output}\n' \
f'Stderr:\n' \
f'{self.stderr}'
def bash(cmd, print_stdout=True, print_stderr=True):
proc = sp.Popen(cmd, stderr=sp.PIPE, stdout=sp.PIPE)
#, shell=True, universal_newlines=True,
#executable='/bin/bash')
all_stdout = []
all_stderr = []
while proc.poll() is None:
for stdout_line in proc.stdout:
if stdout_line != '':
if print_stdout:
print(stdout_line, end='')
all_stdout.append(stdout_line)
for stderr_line in proc.stderr:
if stderr_line != '':
if print_stderr:
print(stderr_line, end='', file=sys.stderr)
all_stderr.append(stderr_line)
stdout_text = ''.join([x.decode() for x in all_stdout])
stderr_text = ''.join([x.decode() for x in all_stderr] )
if proc.wait() != 0:
raise VerboseCalledProcessError(proc.returncode, cmd, stdout_text, stderr_text)
root = Path("/media/christian/ElementsSE/lewis/CIVET-2.1.0/")
fs_subject_dir = Path("/usr/local/freesurfer/subjects/")
# Extracting BEM surfaces using the wathershed algorithm
for file in tqdm(list(root.glob("*"))):
subject = file.name
file_name_in = "{}_t1_final.mnc".format(subject)
path_in = root / subject / "final" / file_name_in
img = nib.load(str(path_in))
path_out = fs_subject_dir / subject / "mri" / "T1.mgz"
path_out.parent.mkdir(parents=True, exist_ok=True)
data = np.asanyarray(img.dataobj, dtype=np.float32)
img = nib.MGHImage(data, affine=img.affine)
nib.save(img, str(path_out))
for file in tqdm(list(root.glob("*"))):
subject = file.name
make_watershed_bem(subject, subjects_dir=None, overwrite=True, volume='T1', atlas=False,
gcaatlas=False, preflood=None, show=False, copy=True,
T1=None, brainmask='ws', verbose=None)
for file in tqdm(list(root.glob("*"))):
subject = file.name
for surface in ["inner_skull", "outer_skull", "outer_skin"]:
path_in = fs_subject_dir / subject / "bem" / "watershed" / "{}_{}_surface".format(subject, surface)
path_out = fs_subject_dir / subject / "bem" / "{}.surf".format(surface)
vertices, faces, volume_info = read_geometry(str(path_in), read_metadata=True)
vert = vertices #+ volume_info["cras"]
write_geometry(str(path_out),
np.array([-vert[:, 2], vert[:, 0], -vert[:, 1]]).T,
faces, volume_info=volume_info)
# Correction for wrong bem on the lower surface due to the bounding box around the brain on the MRI being too small
for file in tqdm(list(root.glob("*"))):
subject = file.name
vertices = {}
head_surfaces = ["inner_skull", "outer_skull", "outer_skin"]
for surface in head_surfaces:
path_in = fs_subject_dir / subject / "bem" / "{}.surf".format(surface)
vertices[surface], faces, volume_info = read_geometry(str(path_in), read_metadata=True)
th = {}
th["outer_skin"] = np.percentile(vertices["outer_skin"][:, 1], 98)-20
th["outer_skull"] = th["outer_skin"]-10
th["inner_skull"] = th["outer_skin"]-40
for inner_surface, outer_surface in zip(head_surfaces[:2], head_surfaces[1:]):
df = {}
for surface in [inner_surface, outer_surface]:
cond = vertices[surface][:, 0] > th[surface]
df[surface] = pd.DataFrame({surface: vertices[surface][:, 0],
"y": np.round(vertices[surface][:, 1]),
"z": np.round(vertices[surface][:, 2]),
"cond": cond,
"no_vert": np.arange(len(vertices[surface]), dtype=int)
})
#df[surface]["angles"] = np.round(np.arctan2(df[surface]["y"], df[surface]["z"])*20)/20.0
#df[surface]["r"] = np.sqrt(df[surface]["y"]**2 + df[surface]["z"]**2)
#rmax_df = df[surface].loc[cond, ["angles", "r"]].groupby("angles").max().reset_index().rename(columns={"r":"rmax"})
#df[surface] = df[surface].merge(rmax_df, on="angles")
#df[surface]["cond2"] = df[surface]["cond"]
#df[surface].loc[df[surface]["r"]/df[surface]["rmax"] > 0.95, "cond2"] = False
x, y, z = zip(*df[inner_surface].loc[df[inner_surface]["cond"], [inner_surface, "y", "z"]].values)
tree = spatial.KDTree(list(zip(y, z)))
tree.data_lut = {(y, z):x for x, y, z in zip(x, y, z)}
for y, z, x, no_vert in df[outer_surface].loc[df[outer_surface]["cond"],
["y", "z", outer_surface, "no_vert"]].values:
inds = tree.query_ball_point([y, z], r=2)
if len(inds) == 0:
dist, ind = tree.query([y, z])
if dist < 10:
inds = [ind]
if len(inds):
max_x = np.max([tree.data_lut[tuple(tree.data[ind])] for ind in inds])
if max_x+2 > x: # need correction?
vertices[outer_surface][int(no_vert), 0] = max_x+2
for surface in head_surfaces:
path_out = fs_subject_dir / subject / "bem" / "{}.surf".format(surface)
_, faces, volume_info = read_geometry(str(path_in), read_metadata=True)
write_geometry(str(path_out), vertices[surface],
faces, volume_info=volume_info)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment