|
from pprint import pprint |
|
import itertools |
|
import dask |
|
|
|
import dask_image.ndmeasure |
|
import numpy as np |
|
import dask.array as da |
|
import tifffile |
|
|
|
import matplotlib.pyplot as plt |
|
%matplotlib inline |
|
|
|
blobs = tifffile.imread('blobs.tif') |
|
blobs = blobs[:128, :128] |
|
chunks = np.array(blobs.shape) // 2 |
|
dask_blobs = da.from_array(blobs, chunks=chunks) |
|
thresh = 120 |
|
dask_bw = dask_blobs > thresh |
|
threshold_image = dask_bw |
|
dask_label_image, num_labels = dask_image.ndmeasure.label(threshold_image) |
|
|
|
print(num_labels.compute()) |
|
plt.imshow(dask_label_image) |
|
|
|
import functools |
|
from itertools import product |
|
import operator |
|
|
|
import numpy as np |
|
import dask.array as da |
|
import dask.bag as db |
|
from dask.delayed import delayed |
|
import scipy.ndimage as ndi |
|
|
|
|
|
arr = dask_label_image |
|
|
|
def dask_block_location(block_id, chunks): |
|
array_location = [] |
|
for idx, chunk in zip(block_id, chunks): |
|
array_location.append(sum(chunk[:idx])) |
|
return tuple(array_location) |
|
|
|
block_iter = zip( |
|
np.ndindex(*arr.numblocks), |
|
map(functools.partial(operator.getitem, arr), |
|
da.core.slices_from_chunks(arr.chunks)) |
|
) |
|
|
|
@delayed |
|
def func(x, array_location, max_label=None): |
|
if max_label is None: |
|
max_label = np.max(x) |
|
|
|
out = {} |
|
for key, values in zip(range(1, max_label + 1), ndi.find_objects(x)): |
|
if values is not None: |
|
adjusted_value = [] # adjust slices for array_locatoin position |
|
for i, val in enumerate(values): |
|
adjusted_value.append(slice(val.start + array_location[i], val.stop + array_location[i], val.step)) |
|
out[key] = tuple(adjusted_value) |
|
# should probably make the output a dask?/pandas? array?? |
|
return out |
|
|
|
arrays = [] |
|
for block_id, block in block_iter: |
|
array_location = dask_block_location(block_id, arr.chunks) |
|
# print("block_id", block_id, "array_location", array_location) |
|
arrays.append(func(block, array_location)) |
|
|
|
bag = db.from_sequence(arrays) |
|
|
|
# adapted example of dictionary merging from stackoverflow |
|
# # https://stackoverflow.com/questions/16560840/python-merge-dictionaries-with-custom-merge-function |
|
import itertools |
|
|
|
|
|
def merge(A, B): |
|
# Start with symmetric difference; keys either in A or B, but not both |
|
merged = {k: A.get(k, B.get(k)) for k in A.keys() ^ B.keys()} |
|
# Update with `f()` applied to the intersection |
|
merged.update({k: f(A[k], B[k]) for k in A.keys() & B.keys()}) |
|
return merged |
|
|
|
def f(x, y): |
|
if x is None: |
|
result = y |
|
elif y is None: |
|
result = x |
|
elif isinstance(x, slice) and isinstance(y, slice): |
|
start = min(x.start, y.start) |
|
stop = max(x.stop, y.stop) |
|
result = slice(start, stop, None) |
|
elif isinstance(x, tuple) and isinstance(y, tuple): |
|
assert len(x) == len(y) |
|
result = [] |
|
for x_slice, y_slice in zip(x, y): |
|
start = min(x_slice.start, y_slice.start) |
|
stop = max(x_slice.stop, y_slice.stop) |
|
result.append(slice(start, stop, None)) |
|
result = tuple(result) |
|
else: |
|
raise ValueError |
|
return result |
|
|
|
|
|
def myfunc(iterable): |
|
iterable = [i.compute() if hasattr(i, "compute") else i for i in iterable] |
|
if len(iterable) == 1: |
|
result = iterable[0] |
|
return result |
|
elif len(iterable) == 2: |
|
A = iterable[0] |
|
B = iterable[1] |
|
result = merge(A, B) |
|
return result |
|
else: |
|
raise ValueError("Oops") |
|
|
|
|
|
with dask.config.set(scheduler='single-threaded'): |
|
result = bag.reduction(myfunc, myfunc, split_every=2).compute() |
|
|
|
pprint(result) |