Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save erjel/2973cb935fde904ae55d050d55c02f90 to your computer and use it in GitHub Desktop.
Save erjel/2973cb935fde904ae55d050d55c02f90 to your computer and use it in GitHub Desktop.
import logging
from pathlib import Path
import webknossos as wk
from webknossos.annotation import Annotation
from webknossos.dataset import MagView
from webknossos.geometry import BoundingBox, Mag
def merge_with_fallback_layer(
output_path: Path,
annotation_zip_path: Path,
segmentation_layer_path: Path,
logger: logging.Logger,
) -> MagView:
assert not output_path.exists(), f"Dataset at {output_path} already exists"
logger.info("Merging volume annotation with fallback layer...")
# Prepare output dataset by creatign a shallow copy of the dataset
# determined by segmentation_layer_path, but do a deep copy of
# segmentation_layer_path itself (so that we can mutate it).
input_segmentation_dataset = wk.Dataset.open(segmentation_layer_path.parent)
output_dataset = wk.Dataset(output_path, scale=input_segmentation_dataset.scale)
output_layer = output_dataset.add_copy_layer(
segmentation_layer_path, segmentation_layer_path.name
)
input_segmentation_mag = input_segmentation_dataset.get_layer(
segmentation_layer_path.name
).get_best_mag()
with Annotation.load(
annotation_zip_path
).temporary_volume_layer_copy() as input_annotation_layer:
input_annotation_mag = input_annotation_layer.get_best_mag()
bboxes = list(
bbox for bbox in input_annotation_mag.get_bounding_boxes_on_disk()
)
output_mag = output_layer.get_mag(input_segmentation_mag.mag)
cube_size = output_mag.header.file_len * output_mag.header.block_len
chunks_with_bboxes = BoundingBox.group_boxes_with_aligned_mag(
bboxes, Mag(cube_size)
)
assert (
input_annotation_mag.header.file_len == 1
), "volume annotation must have file_len=1"
assert (
input_annotation_mag.header.voxel_type
== input_segmentation_mag.header.voxel_type
), "Volume annotation must have same dtype as fallback layer"
chunk_count = 0
for chunk, bboxes in chunks_with_bboxes.items():
chunk_count += 1
logger.info(f"Processing chunk {chunk_count}...")
data_buffer = output_mag.read(absolute_offset=chunk.topleft, size=chunk.size)[0, :, :, :]
for bbox in bboxes:
read_data = input_annotation_mag.read(absolute_offset=bbox.topleft, size=bbox.size)
data_buffer[bbox.offset(-chunk.topleft).to_slices()] = read_data
output_mag.write(data_buffer, absolute_offset=chunk.topleft)
return output_mag
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment