Skip to content

Instantly share code, notes, and snippets.

@normanrz
Created May 20, 2020 09:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save normanrz/dcf3e37e4dc95c330ea8e7432a2fe102 to your computer and use it in GitHub Desktop.
Save normanrz/dcf3e37e4dc95c330ea8e7432a2fe102 to your computer and use it in GitHub Desktop.
Merge webKnossos volume annotations
# VOLUME ANNOTATION MERGE
#
# This script merges multiple volume annotations from webKnossos.
# In case of overlapping annotations, the last annotation wins.
#
# The --relabel flag will relabel the segments of each annotation
# to be unique in the output annotation. Useful when multiple
# annotators created segments with id 1.
#
# 1. Download all annotations that you want to merge.
# You will get a .zip file for each annotation.
# 2. Install Python 3 (if you don't have it)
# 3. Install the dependencies of this script:
# pip install -U wkw
# 4. Run the script from the terminal
# python merge_volume.py volume0.zip volume1.zip volume2.zip
# OR
# python merge_volume.py --relabel volume0.zip volume1.zip volume2.zip
# 5. The script will output a out.zip file that
# you can upload to webKnossos.
#
# License: MIT, scalable minds
import wkw
from zipfile import ZipFile
import sys
from glob import iglob
from uuid import uuid4
import os
import re
import numpy as np
from shutil import rmtree, copyfile
from argparse import ArgumentParser
# Consts
path = os.path
BUCKET_SIZE = 32
out_folder = str(uuid4())
# Prelude
parser = ArgumentParser(description="Merge webKnossos volume annotations")
parser.add_argument(
"--relabel",
action="store_true",
help="Relabel all segments with a new unique id in order to avoid label collisions.",
)
parser.add_argument("zip_files", nargs="+", help="Volume annotation files (.zip)")
args = parser.parse_args()
if len(args.zip_files) == 0:
print("Please supply volume annotations as downloaded from webKnossos")
sys.exit(0)
print("Merging {} annotations: {}".format(len(args.zip_files), args.zip_files))
# Unzip all annotation zips
folder_names = [str(uuid4()) for _ in args.zip_files]
for zip_file, folder_name in zip(args.zip_files, folder_names):
os.makedirs(folder_name, exist_ok=True)
with ZipFile(zip_file, "r") as zip_ref:
zip_ref.extractall(folder_name)
with ZipFile(path.join(folder_name, "data.zip"), "r") as zip_ref:
zip_ref.extractall(folder_name)
print("Unpacked all volume annotations")
# Create output WKW
out_ds = wkw.Dataset.open(
path.join(out_folder, "1"),
wkw.Header(
voxel_type=np.uint32, file_len=1, block_type=wkw.Header.BLOCK_TYPE_LZ4HC
),
)
# Get buckets of all annotations
CUBE_REGEX = re.compile(r"z(\d+)/y(\d+)/x(\d+)(\.wkw)$")
def list_buckets(layer_path):
output = set()
for filename in iglob(path.join(layer_path, "*", "*", "*.wkw"), recursive=True):
m = CUBE_REGEX.search(filename)
if m is not None:
output.add((int(m.group(3)), int(m.group(2)), int(m.group(1))))
return output
bucket_lists = [
list_buckets(path.join(folder_name, "1")) for folder_name in folder_names
]
# Collect all unique labels
label_sets = []
for buckets, folder_name in zip(bucket_lists, folder_names):
with wkw.Dataset.open(path.join(folder_name, "1")) as in_ds:
label_set = set()
for (x, y, z) in buckets:
offset = (x * BUCKET_SIZE, y * BUCKET_SIZE, z * BUCKET_SIZE)
size = (BUCKET_SIZE, BUCKET_SIZE, BUCKET_SIZE)
in_block = in_ds.read(offset, size)[0]
label_set.update(set(in_block[in_block != 0]))
label_sets.append(label_set)
label_maps = []
if args.relabel:
i = 1
for label_set in label_sets:
label_map = {}
for label in label_set:
label_map[label] = i
i += 1
label_maps.append(label_map)
print("Relabelling {} unique labels".format(sum(map(lambda a: len(a), label_maps))))
else:
for label_set in label_sets:
label_maps.append({l: l for l in label_set})
print(
"Found {} labels, not relabelling".format(
sum(map(lambda a: len(a), label_maps))
)
)
# Merge conflicting buckets (write all non-zero items, last write wins)
for buckets, folder_name, label_map in zip(bucket_lists, folder_names, label_maps):
with wkw.Dataset.open(path.join(folder_name, "1")) as in_ds:
for (x, y, z) in buckets:
offset = (x * BUCKET_SIZE, y * BUCKET_SIZE, z * BUCKET_SIZE)
size = (BUCKET_SIZE, BUCKET_SIZE, BUCKET_SIZE)
out_block = out_ds.read(offset, size)[0]
in_block = in_ds.read(offset, size)[0]
for in_label, out_label in label_map.items():
idx = in_block == in_label
out_block[idx] = out_label
out_ds.write(offset, out_block)
print("Merged all data")
# Create zip file
nml_file = next(f for f in iglob(path.join(folder_names[-1], "*.nml")))
with ZipFile("data.zip", "w") as zip_ref:
for root, dirs, files in os.walk(path.join(out_folder, "1")):
for file in files:
zip_ref.write(path.join(root, file))
with ZipFile("out.zip", "w") as zip_ref:
zip_ref.write("data.zip")
zip_ref.write(nml_file, arcname=path.basename(nml_file))
# Cleanup
for folder_name in folder_names:
rmtree(folder_name)
rmtree(out_folder)
os.unlink("data.zip")
# Done
print("Created out.zip")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment