Skip to content

Instantly share code, notes, and snippets.

@clbarnes
Last active May 17, 2022 10:53
Show Gist options
  • Save clbarnes/e3667071efab7368a69822f1e555cd0e to your computer and use it in GitHub Desktop.
Save clbarnes/e3667071efab7368a69822f1e555cd0e to your computer and use it in GitHub Desktop.
DEPRECATED, use https://github.com/clbarnes/bdv_meta/blob/main/add_downsamples.py | Script for (somewhat safely) adding metadata required by BigDataViewer (and optionally n5-viewer) to multiscale datasets stored in N5 containers.
#!/usr/bin/env python
"""
Script for (somewhat safely) adding metadata required by BigDataViewer
(and optionally n5-viewer) to multiscale datasets stored in N5 containers.
"""
from dataclasses import dataclass
import json
from pathlib import Path
from argparse import ArgumentParser
import re
import typing as tp
from collections.abc import MutableMapping
import logging
logger = logging.getLogger(__name__)
ATTRS_FILE = "attributes.json"
ARRAY_ATTR_KEYS = {"dimensions", "dataType", "blockSize", "compression"}
SCALE_RE = re.compile(r"s(\d+)")
unit_re_str = r"([YZEPTGMkhdcmuμnpfazy]|da)?(m|s|Hz)"
UNIT_RE = re.compile(unit_re_str)
RESOLUTION_RE = re.compile(
r"(?P<value>(\d*\.?)?\d+)\s*(?P<unit>" + unit_re_str + ")?"
)
Jso = tp.Optional[tp.Union[tp.Dict[str, "Jso"], tp.List["Jso"], int, float, bool, str]]
def check_key(k: str):
if not isinstance(k, (str, bytes)):
raise TypeError(f"Not a valid key: {repr(k)}")
def check_value(v: Jso):
if isinstance(v, list):
for item in v:
check_value(item)
elif isinstance(v, dict):
for k, val in v.items():
check_key(k)
check_value(val)
elif v is not None and not isinstance(v, (int, float, bool, str, bytes)):
raise TypeError(f"Not a valid value: {repr(v)}")
class ArrayAttrs(tp.TypedDict):
dimensions: tp.List[int]
dataType: str
blockSize: tp.List[int]
compression: tp.Union[tp.Dict[str, Jso], str]
class N5Attrs(MutableMapping):
def __init__(self, d: tp.Dict[str, Jso], gentle=True) -> None:
check_value(d)
self._d: tp.Dict[str, Jso] = d
self.gentle = gentle
def array_meta(self) -> tp.Optional[ArrayAttrs]:
try:
return ArrayAttrs(**{k: self._d[k] for k in ARRAY_ATTR_KEYS})
except KeyError:
return None
def is_array(self) -> bool:
return self.array_meta() is not None
def ndim(self) -> tp.Optional[int]:
arr = self.array_meta()
if arr:
return len(arr["dimensions"])
return None
@classmethod
def from_dir(cls, dpath: Path, gentle=True):
if not dpath.is_dir():
raise FileNotFoundError(f"Directory does not exist: {dpath}")
attr_path = dpath / ATTRS_FILE
if not attr_path.is_file():
return cls(dict(), gentle)
with open(dpath / ATTRS_FILE) as f:
d = json.load(f)
return cls(d, gentle)
def to_dir(self, dpath: Path, pretty=True):
if pretty:
kwargs = {"sort_keys": True, "indent": 2}
else:
kwargs = {}
with open(dpath / ATTRS_FILE, "w") as f:
json.dump(self._d, f, **kwargs)
def __iter__(self):
return self._d.__iter__()
def __len__(self):
return self._d.__len__()
def __getitem__(self, key: str) -> Jso:
check_key(key)
return self._d.__getitem__(key)
def __delitem__(self, key: str):
check_key(key)
if key in ARRAY_ATTR_KEYS:
raise ValueError(f"Cannot write reserved keys: '{key}'")
if self.gentle and key in self._d:
raise ValueError(f"Cannot delete key: '{key}'")
return super().__delitem__(key)
def __setitem__(self, key: str, value: Jso):
check_key(key)
if key in ARRAY_ATTR_KEYS:
raise ValueError(f"Cannot write reserved keys: '{key}'")
key_msg = f"Key already exists: '{key}'"
if key in self._d:
if self.gentle:
raise ValueError(key_msg)
else:
logger.warning(key_msg)
check_value(value)
return self._d.__setitem__(key, value)
def check_scales(dpath: Path) -> tp.Optional[tp.Tuple[int, int]]:
"""Number of scale levels, dimensionality"""
scale_ndims = set()
scales = []
for child in dpath.iterdir():
if not child.is_dir():
continue
m = SCALE_RE.match(child.name)
if not m:
logger.info(f"Non-scale child found: {child}")
return None
try:
attrs = N5Attrs.from_dir(child)
except FileNotFoundError:
logger.info(f"Non-N5 child found: {child}")
return None
if not attrs.is_array():
logger.info(f"Non-array child found: {child}")
scales.append(int(child.name[1:]))
scale_ndims.add(attrs.ndim())
if not scale_ndims:
logger.info(f"No scale children found in {dpath}")
return None
if len(scale_ndims) > 1:
logger.info(f"Scale children have inconsistent dimensionality: {dpath}")
return None
if sorted(scales) != list(range(len(scales))):
logger.info("Scale children are not consecutive from 0")
return None
return len(scales), scale_ndims.pop()
def parse_scales(s):
return [[int(c.strip()) for c in lvl] for lvl in s.split(";")]
@dataclass
class Length:
magnitude: float
unit: tp.Optional[str]
@classmethod
def from_str(cls, s: str):
logger.debug("Parsing length '%s'", s)
m = RESOLUTION_RE.match(s.strip())
if m is None:
raise ValueError(f"Resolution could not be parsed: '{s}'")
d = m.groupdict()
unit = d.get("unit")
if not unit:
unit = None
logger.debug("Got value %s", d["value"])
logger.debug("Got unit %s", unit)
return cls(float(d["value"]), unit)
def parse_resolution(s: str) -> tp.List[Length]:
return [Length.from_str(l_str) for l_str in s.split(",")]
def validate_unit(s: str) -> str:
if not UNIT_RE.match(s):
raise ValueError(f"Not a valid unit: '{s}'")
return s
def main(args=None):
parser = ArgumentParser(description=__doc__)
parser.add_argument(
"group", type=Path, help="Path to directory which contains scale level arrays"
)
parser.add_argument(
"resolution",
type=parse_resolution,
help="Resolution, optionally with units, of scale level 0. Given as comma-separated string '1nm,2um,3GHz'. If data are isotropic, a single length can be given.",
)
parser.add_argument(
"downsamplingFactors",
type=parse_scales,
help="Downscaling factors relative to scale level 0, given as colon-separated comma-separated strings. If a downsampling factor is isotropic, a single length can be given. e.g. '3,3,1:2,2,1:2:2:2'",
)
parser.add_argument(
"-u",
"--unit",
type=validate_unit,
help="Default unit if not given in resolution",
)
parser.add_argument(
"-n",
"--n5-viewer",
action="store_true",
help="Add additional metadata for compatibility with n5-viewer",
)
parser.add_argument(
"-f",
"--force",
action="store_true",
help="Overwrite keys which already exist in the attributes file",
)
parsed = parser.parse_args(args)
resolution = []
units = []
for length in parsed.resolution:
resolution.append(length.magnitude)
if length.unit is None:
if parsed.unit is None:
raise ValueError("Units must be given")
units.append(parsed.unit)
else:
units.append(length.unit)
nlevels_ndims = check_scales(parsed.group)
if nlevels_ndims is None:
raise ValueError(f"Path does not seem to be a scale directory: {parsed.groups}")
nlevels, ndims = nlevels_ndims
if len(resolution) != ndims:
if len(resolution) == 1:
resolution = resolution * ndims
units = units * ndims
else:
raise ValueError(
f"Data has {ndims} dimensions, resolution argument has {len(resolution)}"
)
if nlevels != len(parsed.downsamplingFactors) + 1:
logger.warning(
"Data has %s scale levels, downsampling_factors arg implies %s",
nlevels,
len(parsed.downsamplingFactors) + 1,
)
downsampling = []
for df in parsed.downsamplingFactors:
if len(df) != ndims:
if len(df) == 1:
df = df * ndims
else:
raise ValueError(
f"Data has {ndims} dimensions, downsampling_factors argument has {len(df)}"
)
downsampling.append(df)
attrs = N5Attrs.from_dir(parsed.group, not parsed.force)
attrs["downsamplingFactors"] = downsampling
attrs["resolution"] = resolution
attrs["units"] = units
if parsed.n5_viewer:
if len(set(units)) != 1:
raise ValueError(
"n5-viewer mode only available when dimensions all have the same units"
)
attrs["pixelResolution"] = {"dimensions": resolution, "unit": units[0]}
attrs.to_dir(parsed.group)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment