-
-
Save joshmoore/31303ef65820b8af5728d92a4f2d9b51 to your computer and use it in GitHub Desktop.
Simple OME-Zarr pangeo-forge example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import pandas as pd | |
import pangeo_forge | |
import pangeo_forge.utils | |
import zarr | |
from pangeo_forge.tasks.http import download | |
from pangeo_forge.tasks.xarray import combine_and_write | |
from pangeo_forge.tasks.zarr import consolidate_metadata | |
from prefect import Flow, Parameter, task, unmapped | |
from prefect.tasks.shell import ShellTask | |
@task | |
def source_url(unused: str) -> str: | |
return "https://downloads.openmicroscopy.org/images/DICOM/samples/US-MONO2-8-8x-execho.dcm" | |
@task | |
def build_command(cache: str, target: str): | |
""" | |
Primary logic, requires: | |
conda install -c ome bioformats2raw | |
""" | |
cmd = f"bioformats2raw --file_type=zarr --dimension-order=XYZCT " | |
cmd += f"{cache} {target}" | |
return cmd | |
@task | |
def consolidate(shell_result, path: str) -> None: | |
# TODO: check shell output | |
zarr.consolidate_metadata(zarr.open(path).store) | |
shell = ShellTask(log_stdout=True) | |
class Pipeline(pangeo_forge.AbstractPipeline): | |
name = "example" | |
repo = "ome/pangeo-forge-example" | |
cache_location = Parameter("cache_location", default=f"_tmp_download_{name}") | |
target_location = Parameter("target_location", default=f"_tmp_convert_{name}") | |
@property | |
def sources(self): | |
# This can be ignored for now. | |
pass | |
@property | |
def targets(self): | |
# This can be ignored for now. | |
pass | |
@property | |
def flow(self): | |
with Flow(self.name) as flow: | |
url = source_url(None) # TODO: Map all items from registry | |
file = download(url, cache_location=self.cache_location) | |
cmd = build_command(file, target=self.target_location) | |
zarr = shell(command=cmd) | |
# consolidate_metadata(self.target_location, writes=zarr) FIXME | |
consolidate(zarr, self.target_location) | |
return flow | |
flow = Pipeline().flow | |
if __name__ == "__main__": | |
flow.run() # This is for easy of running |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment