Skip to content

Instantly share code, notes, and snippets.

@satra
Created February 12, 2022 21:08
Show Gist options
  • Save satra/309b7a5dc91a0c9c3a4584804f3bca56 to your computer and use it in GitHub Desktop.
Save satra/309b7a5dc91a0c9c3a4584804f3bca56 to your computer and use it in GitHub Desktop.
rechunk an ngff file
from numcodecs import Blosc
import numpy as np
from pathlib import Path
from rechunker import rechunk
import sys
from tempfile import TemporaryDirectory
import zarr
import click
from pydra.mark import task, annotate
import pydra as pyd
import os
from glob import glob
from shutil import rmtree
def verify_copy(source_dir, target_dir, min_chunk):
z1 = zarr.NestedDirectoryStore(source_dir)
try:
source = zarr.Group(store=z1, read_only=True)
except zarr.errors.GroupNotFoundError:
return False
z2 = zarr.NestedDirectoryStore(target_dir)
try:
target = zarr.Group(store=z2, read_only=True)
except zarr.errors.GroupNotFoundError:
return False
if source.keys() != target.keys():
return False
_, target_chunks, _ = do_rechunk(source_dir, min_chunk)
for key in source:
if source[key].shape != target[key].shape:
print("verify: shape mismatch")
return False
if target_chunks[key] != target[key].chunks:
print("verify: chunk size mismatch")
return False
shape = source[key].shape
midslice = int(np.round(shape[-1]/2))
if not np.allclose(np.squeeze(source[key][0, 0, :, :, midslice]),
np.squeeze(target[key][0, 0, :, :, midslice])):
print("verify: midslice content mismatch")
return False
return True
def do_rechunk(ngff_source, min_chunk):
z = zarr.NestedDirectoryStore(ngff_source)
try:
source = zarr.Group(store=z, read_only=True)
except zarr.errors.GroupNotFoundError:
return True, None, None
target_chunks = {}
options = {}
if len(source) == 0:
return True, None, None
for key in source:
target_chunks[key] = source[key].chunks
options[key] = {"compressor": source[key].compressor, "order": source[key].order}
chunkdim = min(min_chunk, min([val for val in source[key].shape if val > 1]))
target_chunks[key] = (1, 1, chunkdim, chunkdim, chunkdim)
if all([target_chunks[key] == source[key].chunks for key in source]):
return False, None, None
else:
return True, target_chunks, options
def rechunk_ngff(ngff_source, ngff_target=None, min_chunk=128, max_mem="500M", tmpdir=None, verify=True):
"""Rechunk an NGFF file
Presently uses isotropic rechunking.
a few possibilities
- [X] .ngff: only source file and proper chunking
- [X] .ngff: only source file
- [X] .ngff.source: moved source exists
- [X] .ngff.source + .ngff: crash during rechunking
- [X] .ngff.source + .ngff: both files exist and are equal
"""
ext = ".source"
if ext in ngff_source:
moved_source = ngff_source
if Path(moved_source).exists():
ngff_target = moved_source.strip().rstrip(ext)
if Path(ngff_target).exists():
if verify_copy(moved_source, ngff_target, min_chunk):
print("source is moved source and proper ngff file exists")
return ngff_target
print("source is moved source and ngff file is not proper")
rmtree(ngff_target)
else:
raise ValueError(f"{moved_source} does not exist")
else:
moved_source = ngff_source + ext
process, _, _ = do_rechunk(ngff_source, min_chunk)
if not process:
print(f"{ngff_source} has correct chunks")
return ngff_source
if ngff_target is None:
ngff_target = ngff_source
if Path(moved_source).exists():
ngff_source = moved_source
if Path(ngff_target).exists():
if verify_copy(moved_source, ngff_target, min_chunk):
print("source is ngff, moved and ngff file exists")
return ngff_target
print("source is ngff, but not proper")
rmtree(ngff_target)
else:
ngff_source = Path(ngff_source).rename(moved_source)
if Path(ngff_source).resolve() == Path(ngff_target).resolve():
raise ValueError(f"source {ngff_source} and target {ngff_target} are the same locations")
z = zarr.NestedDirectoryStore(ngff_source)
source = zarr.Group(store=z, read_only=True)
process, target_chunks, options = do_rechunk(ngff_source, min_chunk)
assert process
target = zarr.NestedDirectoryStore(ngff_target)
with TemporaryDirectory(dir=tmpdir) as tmpdirname:
rechunked = rechunk(source,
target_chunks=target_chunks,
target_store=target,
target_options=options,
temp_store=tmpdirname,
max_mem=max_mem)
target_store = rechunked.execute()
zarr.consolidate_metadata(target)
if verify:
if not verify_copy(ngff_source, ngff_target, min_chunk):
raise Exception(f"Mismatch: {ngff_source} and {ngff_target}")
return target_store
convert_pdt = task(
annotate({"ngff_source": str,
"ngff_target": str,
"min_chunk": int,
"max_mem": str,
"tmpdir": str,
"verify": bool})(rechunk_ngff)
)
@click.command()
@click.argument("ngff_base", nargs=1, type=click.Path(exists=True, file_okay=True, dir_okay=True))
@click.option("--target", type=click.Path(exists=True, file_okay=True, dir_okay=True), default=None)
@click.option("--pattern", type=str, default="*.ngff")
@click.option("--chunk", type=int, default=128)
@click.option("--procs", type=int, default=2)
@click.option("--pydra/--no-pydra", default=True)
@click.option("--moved/--no-moved", default=False)
def convert(ngff_base, target, pattern, chunk, procs, pydra, moved):
if Path(ngff_base).is_dir() and not ngff_base.strip().endswith(".ngff"):
if moved:
ngff_files = sorted(glob(os.path.join(ngff_base, pattern + ".source")))
else:
ngff_files = sorted(glob(os.path.join(ngff_base, pattern)))
if target is not None:
raise ValueError(f"{target} cannot be provided when input is a directory")
else:
if not ngff_base.strip().endswith(".ngff"):
raise ValueError(f"{ngff_base} is not an ngff folder")
ngff_files = [ngff_base]
print(f"processing {len(ngff_files)} files")
if target is not None:
target = str(target)
if len(ngff_files):
if not pydra:
for ngff_file in ngff_files:
print(f"processing: {ngff_file}")
rechunk_ngff(ngff_file, ngff_target=target, min_chunk=chunk)
else:
ngff_files = [str(Path(val).absolute()) for val in ngff_files]
task = convert_pdt(ngff_source=ngff_files, ngff_target=target, min_chunk=chunk).split("ngff_source")
print("Starting pydra processing")
with pyd.Submitter(plugin="cf", n_procs=procs) as sub:
sub(runnable=task)
if __name__ == "__main__":
convert()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment