Last active
May 20, 2022 21:57
-
-
Save philipmac/6bde128c1f3ecc0fa3aa33b3fc2eccdb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import json | |
import subprocess | |
import re | |
import math | |
import glob | |
from typing import List, Dict | |
from pathlib import Path | |
import shutil | |
import tempfile | |
import prefect | |
from jinja2 import Environment, FileSystemLoader | |
from prefect import task, Flow, Parameter, unmapped, flatten | |
from prefect.engine import signals | |
# from prefect.tasks.shell import ShellTask | |
from image_portal_workflows.utils import utils | |
from image_portal_workflows.shell_task_echo import ShellTaskEcho | |
from image_portal_workflows.config import Config | |
shell_task = ShellTaskEcho(log_stderr=True, return_all=True, stream_output=True) | |
@task | |
def make_work_dir(fname) -> Path: | |
""" | |
a temporary dir to house all files in the form: | |
{Config.tmp_dir}{fname.stem}. | |
eg: /gs1/home/macmenaminpe/tmp/tmp7gcsl4on/tomogram_fname/ | |
Will be rm'd upon completion. | |
""" | |
working_dir = Path(tempfile.mkdtemp(dir=f"{Config.tmp_dir}")) | |
prefect.context.get("logger").info( | |
f"created working_dir {working_dir} for {fname.as_posix()}" | |
) | |
return Path(working_dir) | |
@task | |
def copy_template(working_dir: Path, template_name: str) -> Path: | |
""" | |
copies the template adoc file to the working_dir, | |
""" | |
adoc_fp = f"{working_dir}/{template_name}.adoc" | |
template_fp = f"{Config.template_dir}/{template_name}.adoc" | |
logger = prefect.context.get("logger") | |
logger.info(f"trying to copy {template_fp} to {adoc_fp}") | |
shutil.copyfile(template_fp, adoc_fp) | |
return Path(adoc_fp) | |
@task | |
def list_input_dir(input_dir_fp: Path) -> List[Path]: | |
""" | |
discover the contents of the input_dir AKA "Sample" | |
note, only lists mrc files currently. TODO(?) | |
include .st files TODO | |
note, only processing first file ATM (test) | |
""" | |
logger = prefect.context.get("logger") | |
logger.info(f"trying to list {input_dir_fp}") | |
mrc_files = glob.glob(f"{input_dir_fp}/*.mrc") | |
if len(mrc_files) == 0: | |
raise signals.FAIL(f"Unable to find any input files in dir: {input_dir_fp}") | |
mrc_fps = [Path(f) for f in mrc_files] | |
# TESTING IGNORE | |
# mrc_fps = [Path(f"/home/macmenaminpe/data/brt_run/Projects/ABC2/2013-1220-dA30_5-BSC-1_10.mrc")] | |
logger = prefect.context.get("logger") | |
logger.info(f"Found {mrc_fps}") | |
return mrc_fps | |
@task | |
def copy_tg_to_working_dir(fname: Path, working_dir: Path) -> Path: | |
""" | |
copies files (tomograms/mrc files) into working_dir | |
returns Path of copied file | |
""" | |
# TESTING IGNORE | |
# fp = Path(f"/home/macmenaminpe/data/brt_run/Projects/ABC2/{fname.name}") | |
fp = shutil.copyfile(src=fname.as_posix(), dst=f"{working_dir}/{fname.name}") | |
return Path(fp) | |
@task | |
def update_adoc( | |
adoc_fp: Path, | |
tg_fp: Path, | |
dual: str, | |
montage: str, | |
gold: str, | |
focus: str, | |
bfocus: str, | |
fiducialless: str, | |
trackingMethod: str, | |
TwoSurfaces: str, | |
TargetNumberOfBeads: str, | |
LocalAlignments: str, | |
THICKNESS: str, | |
) -> Path: | |
"""updates the adoc file with input params,""" | |
file_loader = FileSystemLoader(str(adoc_fp.parent)) | |
env = Environment(loader=file_loader) | |
template = env.get_template(adoc_fp.name) | |
name = tg_fp.stem | |
stackext = tg_fp.suffix | |
currentBStackExt = "mrc" # TODO - should be a tupple of fps | |
datasetDirectory = adoc_fp.parent | |
if TwoSurfaces == "0": | |
SurfacesToAnalyze = 1 | |
elif TwoSurfaces == "1": | |
SurfacesToAnalyze = 2 | |
else: | |
raise signals.FAIL( | |
f"Unable to resolve SurfacesToAnalyze, TwoSurfaces \ | |
is set to {TwoSurfaces}, and should be 0 or 1" | |
) | |
rpa_thickness = int(THICKNESS) * 1.5 | |
vals = { | |
"name": name, | |
"stackext": stackext, | |
"currentBStackExt": currentBStackExt, | |
"dual": dual, | |
"montage": montage, | |
"gold": gold, | |
"focus": focus, | |
"bfocus": bfocus, | |
"datasetDirectory": datasetDirectory, | |
"fiducialless": fiducialless, | |
"trackingMethod": trackingMethod, | |
"TwoSurfaces": TwoSurfaces, | |
"TargetNumberOfBeads": TargetNumberOfBeads, | |
"SurfacesToAnalyze": SurfacesToAnalyze, | |
"LocalAlignments": LocalAlignments, | |
"rpa_thickness": rpa_thickness, | |
"THICKNESS": THICKNESS, | |
} | |
# junk above for now. | |
# vals = { | |
# "basename": name, | |
# "bead_size": 10, | |
# "light_beads": 0, | |
# "tilt_thickness": 256, | |
# "montage": 0, | |
# "dataset_dir": str(adoc_fp.parent), | |
# } | |
output = template.render(vals) | |
adoc_loc = Path(f"{adoc_fp.parent}/{tg_fp.stem}.adoc") | |
with open(adoc_loc, "w") as _file: | |
print(output, file=_file) | |
logger = prefect.context.get("logger") | |
logger.info(f"generated {adoc_loc}") | |
return adoc_loc | |
@task | |
def create_brt_command(adoc_fp: Path) -> str: | |
cmd = f"{Config.brt_binary} -di {adoc_fp.as_posix()} -cp 8 -gpu 1" | |
logger = prefect.context.get("logger") | |
logger.info(cmd) | |
return cmd | |
# to short test | |
# return "ls" | |
@task | |
def gen_dimension_command(tg_fp: Path) -> str: | |
command = f"header -s {tg_fp.parent}/{tg_fp.stem}_ali.mrc" | |
a = subprocess.run(command, shell=True, check=True, capture_output=True) | |
outputs = a.stdout | |
xyz_dim = re.split(" +(\d+)", str(outputs)) | |
z_dim = xyz_dim[5] | |
logger = prefect.context.get("logger") | |
logger.info(f"z_dim: {z_dim:}") | |
return z_dim | |
@task | |
def check_brt_run_ok(tg_fp: Path): | |
""" | |
ensures the following files exist: | |
BASENAME_rec.mrc - the source for the reconstruction movie | |
BASENAME_full_rec.mrc - the source for the Neuroglancer pyramid | |
BASENAME_ali.mrc | |
""" | |
rec_file = Path(f"{tg_fp.parent}/{tg_fp.stem}_rec.mrc") | |
full_rec_file = Path(f"{tg_fp.parent}/{tg_fp.stem}_full_rec.mrc") | |
ali_file = Path(f"{tg_fp.parent}/{tg_fp.stem}_ali.mrc") | |
for _file in [rec_file, full_rec_file, ali_file]: | |
if not _file.exists(): | |
raise signals.FAIL(f"File {_file} does not exist. BRT run failure.") | |
@task | |
def gen_ns_cmnds(fp: Path, z_dim) -> List[str]: | |
""" | |
newstack -secs {i}-{i} path/BASENAME_ali*.mrc WORKDIR/hedwig/BASENAME_ali{i}.mrc | |
""" | |
cmds = list() | |
for i in range(1, int(z_dim)): | |
cmds.append( | |
f"newstack -secs {i}-{i} {fp.parent}/{fp.stem}_ali*.mrc {fp.parent}/{fp.stem}.mrc" | |
) | |
logger = prefect.context.get("logger") | |
logger.info(cmds) | |
return cmds | |
@task | |
def gen_ns_float(fp: Path) -> str: | |
""" | |
newstack -float 3 path/BASENAME_ali*.mrc path/ali_BASENAME.mrc | |
""" | |
in_fp = f"{fp.parent}/{fp.stem}_ali*.mrc" | |
out_fp = f"{fp.parent}/ali_{fp.stem}.mrc" | |
cmd = f"newstack -float 3 {in_fp} {out_fp}" | |
logger = prefect.context.get("logger") | |
logger.info(cmd) | |
return cmd | |
@task | |
def gen_mrc2tiff(fp: Path) -> str: | |
"""mrc2tif -j -C 0,255 path/ali_BASENAME.mrc path/BASENAME_ali""" | |
in_fp = f"{fp.parent}/ali_{fp.stem}.mrc" | |
out_fp = f"{fp.parent}/{fp.stem}_ali" | |
cmd = f"mrc2tif -j -C 0,255 {in_fp} {out_fp}" | |
logger = prefect.context.get("logger") | |
logger.info(cmd) | |
return cmd | |
@task | |
def gen_gm_convert(fp: Path, middle_i: str) -> List[str]: | |
""" | |
generates the keyThumbnail | |
gm convert -size 300x300 path/BASENAME_ali.{MIDDLE_I}.jpg -resize 300x300 -sharpen 2 -quality 70 path/keyimg_BASENAME_s.jpg | |
""" | |
middle_i_fname = f"{fp.parent}/{fp.stem}_ali.{middle_i}.jpg" | |
thumbnail_fp = f"{fp.parent}/keyimg_{fp.stem}_s.jpg" | |
cmd = f"gm convert -size 300x300 {middle_i_fname} -resize 300x300 \ | |
-sharpen 2 -quality 70 {thumbnail_fp}" | |
logger = prefect.context.get("logger") | |
logger.info(cmd) | |
return [cmd, thumbnail_fp] | |
@task | |
def calc_middle_i(z_dim: str) -> str: | |
""" | |
we want to find the middle image of the stack (for use as thumbnail) | |
the file name later needed is padded with zeros | |
TODO - this might be 3 or 4 - make this not a magic number. | |
""" | |
fl = math.floor(int(z_dim) / 2) | |
fl_padded = str(fl).rjust(3, "0") | |
logger = prefect.context.get("logger") | |
logger.info(f"middle i: {fl_padded}") | |
return fl_padded | |
@task | |
def gen_ffmpeg_cmd(fp: Path) -> List[str]: | |
""" | |
generates the tilt moveie | |
ffmpeg -f image2 -framerate 4 -i ${BASENAME}_ali.%03d.jpg -vcodec \ | |
libx264 -pix_fmt yuv420p -s 1024,1024 tiltMov_${BASENAME}.mp4 | |
""" | |
input_fp = f"{fp.parent}/{fp.stem}_ali.%03d.jpg" | |
out_fp = f"{fp.parent}/tiltMov_{fp.stem}.mp4" | |
cmd = f"ffmpeg -y -f image2 -framerate 4 -i {input_fp} -vcodec \ | |
libx264 -pix_fmt yuv420p -s 1024,1024 {out_fp}" | |
logger = prefect.context.get("logger") | |
logger.info(f"ffmpeg command: {cmd}") | |
return [cmd, out_fp] | |
# "_rc" denotes ReConstruction movie related funcs | |
@task | |
def gen_clip_rc_cmds(fp: Path, z_dim) -> List[str]: | |
""" | |
for i in range(2, dimensions.z-2): | |
IZMIN = i-2 | |
IZMAX = i+2 | |
clip avg -2d -iz IZMIN-IZMAX -m 1 path/BASENAME_rec.mrc path/hedwig/BASENAME_ave${I}.mrc | |
""" | |
cmds = list() | |
for i in range(2, int(z_dim) - 2): | |
izmin = i - 2 | |
izmax = i + 2 | |
in_fp = f"{fp.parent}/{fp.stem}_rec.mrc" | |
out_fp = f"{fp.parent}/{fp.stem}_ave{str(i)}.mrc" | |
cmd = f"clip avg -2d -iz {str(izmin)}-{str(izmax)} -m 1 {in_fp} {out_fp}" | |
cmds.append(cmd) | |
logger = prefect.context.get("logger") | |
logger.info(f"reconstruction clip_cmds : {cmds}") | |
return cmds | |
@task | |
def newstack_fl_rc_cmd(fp: Path) -> List[str]: | |
""" | |
newstack -float 3 path/BASENAME_ave* path/ave_BASENAME.mrc | |
""" | |
fp_in = f"{fp.parent}/{fp.stem}_ave*" | |
fp_out = f"{fp.parent}/ave_{fp.stem}.mrc" | |
cmd = f"sleep 10 && newstack -float 3 {fp_in} {fp_out}" | |
logger = prefect.context.get("logger") | |
logger.info(f"reconstruction newstack_fl_rc_cmd {cmd}") | |
return [cmd, fp_out] | |
@task | |
def gen_binvol_rc_cmd(fp: Path) -> List[str]: | |
""" | |
binvol -binning 2 path/ave_BASENAME.mrc path/avebin8_BASENAME.mrc | |
""" | |
fp_in = f"{fp.parent}/ave_{fp.stem}.mrc" | |
fp_out = f"{fp.parent}/avebin8_{fp.stem}.mrc" | |
cmd = f"binvol -binning 2 {fp_in} {fp_out}" | |
logger = prefect.context.get("logger") | |
logger.info(f"reconstruction binvol command: {cmd}") | |
return [cmd, fp_out] | |
@task | |
def gen_mrc2tiff_rc_cmd(fp: Path) -> str: | |
""" | |
mrc2tif -j -C 100,255 path/ave_BASNAME.mrc path/BASENAME_mp4 | |
""" | |
fp_in = f"{fp.parent}/ave_{fp.stem}.mrc" | |
fp_out = f"{fp.parent}/{fp.stem}_mp4" | |
cmd = f"mrc2tif -j -C 100,255 {fp_in} {fp_out}" | |
logger = prefect.context.get("logger") | |
logger.info(f"mrc2tiff command: {cmd}") | |
return cmd | |
@task | |
def gen_ffmpeg_rc_cmd(fp: Path) -> List[str]: | |
""" | |
TODO - 3 or 4? | |
ffmpeg -f image2 -framerate 8 -i path/BASENAME_mp4.%03d.jpg -vcodec \ | |
libx264 -pix_fmt yuv420p -s 1024,1024 path/keyMov_BASENAME.mp4 | |
""" | |
fp_in = f"{fp.parent}/{fp.stem}_mp4.%03d.jpg" | |
fp_out = f"{fp.parent}/keyMov_{fp.stem}.mp4" | |
cmd = f"ffmpeg -f image2 -framerate 8 -y -i {fp_in} -vcodec \ | |
libx264 -pix_fmt yuv420p -s 1024,1024 {fp_out}" | |
logger = prefect.context.get("logger") | |
logger.info(f"reconstruction ffmpeg command: {cmd}") | |
return [cmd, fp_out] | |
@task | |
def gen_mrc2nifti_cmd(fp: Path) -> str: | |
""" | |
mrc2nifti path/{basename}_full_rec.mrc path/{basename}.nii | |
""" | |
cmd = f"mrc2nifti {fp.parent}/{fp.stem}_full_rec.mrc {fp.parent}/{fp.stem}.nii" | |
logger = prefect.context.get("logger") | |
logger.info(f"mrc2nifti command: {cmd}") | |
return cmd | |
@task | |
def gen_pyramid_cmd(fp: Path) -> List[str]: | |
""" | |
volume-to-precomputed-pyramid --downscaling-method=average --no-gzip --flat path/basename.nii path/neuro-basename | |
""" | |
outdir = Path(f"{fp.parent}/neuro-{fp.stem}") | |
logger = prefect.context.get("logger") | |
if outdir.exists(): | |
shutil.rmtree(outdir) | |
logger.info(f"removing {outdir}") | |
cmd = f"volume-to-precomputed-pyramid --downscaling-method=average --no-gzip \ | |
--flat {fp.parent}/{fp.stem}.nii {outdir}" | |
logger = prefect.context.get("logger") | |
logger.info(f"pyramid command: {cmd}") | |
return [cmd, outdir.as_posix()] | |
@task | |
def gen_min_max_cmd(fp: Path) -> List[str]: | |
""" | |
mrc_visual_min_max {basename}.nii --mad 5 --output-json mrc2ngpc-output.json | |
""" | |
out_fp = f"{fp.parent}/mrc2ngpc-output.json" | |
cmd = f"mrc_visual_min_max {fp.parent}/{fp.stem}.nii --mad 5 \ | |
--output-json {out_fp}" | |
logger = prefect.context.get("logger") | |
logger.info(f"gen_min_max_cmd command: {cmd}") | |
return [cmd, out_fp] | |
@task | |
def parse_min_max_file(l: List) -> Dict[str, str]: | |
fp = l[1] | |
with open(fp, "r") as _file: | |
return json.load(_file) | |
# with Flow("brt_flow", executor=Config.SLURM_EXECUTOR) as f: | |
with Flow("brt_flow") as f: | |
# This block of params map are for adoc file specfication. | |
# Note the ugly names, these parameters are lifted verbatim from | |
# https://bio3d.colorado.edu/imod/doc/directives.html where possible. | |
# (there are two thickness args, these are not verbatim.) | |
dual = Parameter("dual") | |
montage = Parameter("montage") | |
gold = Parameter("gold") | |
focus = Parameter("focus") | |
bfocus = Parameter("bfocus") | |
fiducialless = Parameter("fiducialless") | |
trackingMethod = Parameter("trackingMethod") | |
TwoSurfaces = Parameter("TwoSurfaces") | |
TargetNumberOfBeads = Parameter("TargetNumberOfBeads") | |
LocalAlignments = Parameter("LocalAlignments") | |
THICKNESS = Parameter("THICKNESS") | |
# end user facing adoc params | |
adoc_template = Parameter("adoc_template") | |
input_dir = Parameter("input_dir") | |
callback_url = Parameter("callback_url")() | |
token = Parameter("token")() | |
sample_id = Parameter("sample_id")() | |
# a single input_dir will have n tomograms | |
input_dir_fp = utils.get_input_dir(input_dir=input_dir) | |
fnames = list_input_dir(input_dir_fp=input_dir_fp) | |
temp_dirs = make_work_dir.map(fnames) | |
adoc_fps = copy_template.map( | |
working_dir=temp_dirs, template_name=unmapped(adoc_template) | |
) | |
updated_adocs = update_adoc.map( | |
adoc_fp=adoc_fps, | |
tg_fp=fnames, | |
dual=unmapped(dual), | |
montage=unmapped(montage), | |
gold=unmapped(gold), | |
focus=unmapped(focus), | |
bfocus=unmapped(bfocus), | |
fiducialless=unmapped(fiducialless), | |
trackingMethod=unmapped(trackingMethod), | |
TwoSurfaces=unmapped(TwoSurfaces), | |
TargetNumberOfBeads=unmapped(TargetNumberOfBeads), | |
LocalAlignments=unmapped(LocalAlignments), | |
THICKNESS=unmapped(THICKNESS), | |
) | |
tomogram_fps = copy_tg_to_working_dir.map(fname=fnames, working_dir=temp_dirs) | |
# START BRT (Batchruntomo) - long running process. | |
brt_commands = create_brt_command.map(adoc_fp=updated_adocs) | |
brts = shell_task.map( | |
command=brt_commands, | |
upstream_tasks=[tomogram_fps], | |
to_echo=unmapped("BRT commands"), | |
) | |
brts_ok = check_brt_run_ok.map(tg_fp=tomogram_fps, upstream_tasks=[brts]) | |
# END BRT, check files for success (else fail here) | |
# stack dimensions - used in movie creation | |
z_dims = gen_dimension_command.map(tg_fp=tomogram_fps, upstream_tasks=[brts_ok]) | |
# START TILT MOVIE GENERATION: | |
newstack_cmds = gen_ns_cmnds.map( | |
fp=tomogram_fps, z_dim=z_dims, upstream_tasks=[brts_ok] | |
) | |
newstacks = shell_task.map( | |
command=flatten(newstack_cmds), | |
upstream_tasks=[brts_ok], | |
to_echo=unmapped("newstack tilt"), | |
) | |
ns_float_cmds = gen_ns_float.map(fp=tomogram_fps, upstream_tasks=[newstacks]) | |
ns_floats = shell_task.map( | |
command=ns_float_cmds, to_echo=unmapped("ns_floats commands") | |
) | |
mrc2tiff_cmds = gen_mrc2tiff.map(fp=tomogram_fps, upstream_tasks=[ns_floats]) | |
mrc2tiffs = shell_task.map( | |
command=mrc2tiff_cmds, to_echo=unmapped("mrc2tiff commands") | |
) | |
middle_is = calc_middle_i.map(z_dim=z_dims) | |
# key_imgs are an output/asset - location needs to be reported. | |
gm_cmds_and_thumbnail_locs = gen_gm_convert.map( | |
fp=tomogram_fps, middle_i=middle_is, upstream_tasks=[mrc2tiffs] | |
) | |
gm_cmds = utils.to_command.map(cmd_and_fp=gm_cmds_and_thumbnail_locs) | |
gms = shell_task.map(command=gm_cmds, to_echo=unmapped("gm thumbnail commands")) | |
mpeg_cmds_and_tiltMovie_fps = gen_ffmpeg_cmd.map( | |
fp=tomogram_fps, upstream_tasks=[mrc2tiffs] | |
) | |
mpeg_cmds = utils.to_command.map(mpeg_cmds_and_tiltMovie_fps) | |
mpegs = shell_task.map(command=mpeg_cmds, to_echo=unmapped("ffmpeg comands")) | |
# END TILT MOVIE GENERATION | |
# # create assets directory | |
assets_dir = utils.make_assets_dir(input_dir=input_dir) | |
# generate callback body | |
files_elts = utils.generate_callback_files.map(input_fname=tomogram_fps) | |
# keyThumbnail | |
keyThumbnail_elts = utils.move_to_assets.map( | |
cmd_and_fp=gm_cmds_and_thumbnail_locs, | |
asset_dir=unmapped(assets_dir), | |
input_fp=tomogram_fps, | |
asset_type=unmapped("keyThumbnail"), | |
upstream_tasks=[gms], | |
) | |
files_elts = utils.add_assets.map( | |
assets_list=files_elts, new_asset=keyThumbnail_elts | |
) | |
# tiltMovie | |
tiltMovie_elts = utils.move_to_assets.map( | |
cmd_and_fp=mpeg_cmds_and_tiltMovie_fps, | |
asset_dir=unmapped(assets_dir), | |
input_fp=tomogram_fps, | |
asset_type=unmapped("tiltMovie"), | |
upstream_tasks=[mpegs], | |
) | |
files_elts = utils.add_assets.map( | |
assets_list=files_elts, new_asset=tiltMovie_elts | |
) | |
utils.print_t.map( | |
files_elts, | |
upstream_tasks=[ | |
keyThumbnail_elts, | |
tiltMovie_elts, | |
averagedVolume_elts, | |
volume_elts, | |
recMovie_elts, | |
ng_elts, | |
], | |
) | |
# utils.print_t.map(min_maxs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment