Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save GenevieveBuckley/cd80f57b433d917fec9dff38abe032e1 to your computer and use it in GitHub Desktop.
Save GenevieveBuckley/cd80f57b433d917fec9dff38abe032e1 to your computer and use it in GitHub Desktop.
Slightly simpler way to get the array location for any dask block
import numpy as np
import pandas as pd
def _find_slices(x):
"""An alternative to scipy.ndi.find_objects"""
unique_vals = np.unique(x)
unique_vals = unique_vals[unique_vals != 0]
result = {}
for val in unique_vals:
print(val)
positions = np.where(x == val)
slices = tuple(slice(np.min(i), np.max(i) + 1, 1)for i in positions)
result[val] = slices
return pd.DataFrame.from_dict(result, orient='index')
from itertools import product
import dask.array as da
def dask_block_location(block_id, chunks):
array_location = []
for idx, chunk in zip(block_id, chunks):
array_location.append(sum(chunk[:idx]))
return tuple(array_location)
arr = da.ones((12,14), chunks=(2,5))
for block_id in product(*(range(len(c)) for c in arr.chunks)):
array_location = dask_block_location(block_id, arr.chunks)
print("block_id", block_id, "array_location", array_location)
import functools
from itertools import product
import operator
import numpy as np
import dask.array as da
from dask.delayed import delayed
import scipy.ndimage as ndi
arr = dask_label_image
def dask_block_location(block_id, chunks):
array_location = []
for idx, chunk in zip(block_id, chunks):
array_location.append(sum(chunk[:idx]))
return tuple(array_location)
block_iter = zip(
np.ndindex(*arr.numblocks),
map(functools.partial(operator.getitem, arr),
da.core.slices_from_chunks(arr.chunks))
)
@delayed
def func(x, array_location, max_label=None):
if max_label is None:
max_label = np.max(x)
out = {}
for key, values in zip(range(1, max_label + 1), ndi.find_objects(x)):
if values is not None:
adjusted_value = [] # adjust slices for array_locatoin position
for i, val in enumerate(values):
adjusted_value.append(slice(val.start + array_location[i], val.stop + array_location[i], val.step))
out[key] = tuple(adjusted_val)
return out
arrays = []
for block_id, block in block_iter:
array_location = dask_block_location(block_id, arr.chunks)
# print("block_id", block_id, "array_location", array_location)
arrays.append(func(block, array_location))
import pandas as pd
import dask.dataframe as dd
def _combine_slices(slices):
if len(slices) == 1:
return slices[0]
else:
start = min([sl.start for sl in slices])
stop = max([sl.stop for sl in slices])
return slice(start, stop, 1)
def myfunc(x, ndim):
x = x.dropna()
data = {}
for i in range(ndim):
slices = [x[ii] for ii in x.index if str(ii).startswith(str(i))]
combined_slices = _combine_slices(slices)
data[i] = combined_slices
result = pd.Series(data=data, index=[i for i in range(ndim)], name=x.name)
return result
data = {'0_x': {111: slice(1, 3, 1), 222: slice(3, 4, 1), 333: nan},
'1_x': {111: slice(0, 2, 1), 222: slice(3, 5, 1), 333: nan},
'0_y': {111: nan, 222: slice(3, 4, 1), 333: slice(0, 2, 1)},
'1_y': {111: nan, 222: slice(5, 8, 1), 333: slice(7, 10, 1)}}
df = pd.DataFrame.from_dict(data)
ndim = 2
pandas_result = df.apply(myfunc, ndim=ndim, axis=1)
ddf = dd.from_pandas(df, npartitions=2)
meta = dd.utils.make_meta([(i, object) for i in range(ndim)])
dask_result = ddf.apply(myfunc, ndim=ndim, axis=1, meta=meta)
print(dask_result.compute())
import numpy as np
import pandas as pd
import dask.dataframe as dd
dict1 = {
0: slice(1, 10, 1),
1: slice(5, 15, 1),
2: slice(7, 12, 1),
}
dict2 = {
0: slice(4, 14, 1),
3: slice(8, 12, 1),
4: slice(6, 2, -1),
}
df1 = pd.DataFrame.from_dict(dict1, columns=["col"], orient='index')
df2 = pd.DataFrame.from_dict(dict2, columns=["col"], orient='index')
df = pd.merge(df1, df2, how="outer", left_index=True, right_index=True)
def isnan(value):
try:
if np.isnan(value):
return True
except Exception:
if value is np.nan:
return True
else:
return False
def join_func(data):
left, right = data
if isnan(left):
return right
elif isnan(right):
return left
else:
start = min(left.start, right.start)
stop = max(left.stop, right.stop)
return slice(start, stop, 1)
# pandas result
result = df.apply(join_func, axis=1)
# dask result
rows_per_partition = 2
ddf = dd.from_pandas(df, chunksize=rows_per_partition)
meta = dd.utils.make_meta(("result", object))
dask_result = ddf.apply(join_func, axis=1, meta=meta)
print(dask_result.compute())
from pprint import pprint
import itertools
import dask
import dask_image.ndmeasure
import numpy as np
import dask.array as da
import tifffile
import matplotlib.pyplot as plt
%matplotlib inline
blobs = tifffile.imread('blobs.tif')
blobs = blobs[:128, :128]
chunks = np.array(blobs.shape) // 2
dask_blobs = da.from_array(blobs, chunks=chunks)
thresh = 120
dask_bw = dask_blobs > thresh
threshold_image = dask_bw
dask_label_image, num_labels = dask_image.ndmeasure.label(threshold_image)
print(num_labels.compute())
plt.imshow(dask_label_image)
import functools
from itertools import product
import operator
import numpy as np
import dask.array as da
import dask.bag as db
from dask.delayed import delayed
import scipy.ndimage as ndi
arr = dask_label_image
def dask_block_location(block_id, chunks):
array_location = []
for idx, chunk in zip(block_id, chunks):
array_location.append(sum(chunk[:idx]))
return tuple(array_location)
block_iter = zip(
np.ndindex(*arr.numblocks),
map(functools.partial(operator.getitem, arr),
da.core.slices_from_chunks(arr.chunks))
)
@delayed
def func(x, array_location, max_label=None):
if max_label is None:
max_label = np.max(x)
out = {}
for key, values in zip(range(1, max_label + 1), ndi.find_objects(x)):
if values is not None:
adjusted_value = [] # adjust slices for array_locatoin position
for i, val in enumerate(values):
adjusted_value.append(slice(val.start + array_location[i], val.stop + array_location[i], val.step))
out[key] = tuple(adjusted_value)
# should probably make the output a dask?/pandas? array??
return out
arrays = []
for block_id, block in block_iter:
array_location = dask_block_location(block_id, arr.chunks)
# print("block_id", block_id, "array_location", array_location)
arrays.append(func(block, array_location))
bag = db.from_sequence(arrays)
# adapted example of dictionary merging from stackoverflow
# # https://stackoverflow.com/questions/16560840/python-merge-dictionaries-with-custom-merge-function
import itertools
def merge(A, B):
# Start with symmetric difference; keys either in A or B, but not both
merged = {k: A.get(k, B.get(k)) for k in A.keys() ^ B.keys()}
# Update with `f()` applied to the intersection
merged.update({k: f(A[k], B[k]) for k in A.keys() & B.keys()})
return merged
def f(x, y):
if x is None:
result = y
elif y is None:
result = x
elif isinstance(x, slice) and isinstance(y, slice):
start = min(x.start, y.start)
stop = max(x.stop, y.stop)
result = slice(start, stop, None)
elif isinstance(x, tuple) and isinstance(y, tuple):
assert len(x) == len(y)
result = []
for x_slice, y_slice in zip(x, y):
start = min(x_slice.start, y_slice.start)
stop = max(x_slice.stop, y_slice.stop)
result.append(slice(start, stop, None))
result = tuple(result)
else:
raise ValueError
return result
def myfunc(iterable):
iterable = [i.compute() if hasattr(i, "compute") else i for i in iterable]
if len(iterable) == 1:
result = iterable[0]
return result
elif len(iterable) == 2:
A = iterable[0]
B = iterable[1]
result = merge(A, B)
return result
else:
raise ValueError("Oops")
with dask.config.set(scheduler='single-threaded'):
result = bag.reduction(myfunc, myfunc, split_every=2).compute()
pprint(result)

Outstanding questions

2021-07-15

  • How can I improve the efficiency instead of using ndi.find_objects?
    • I don't want to loop through all the None values we will inevitably get, that'll get stupid large very quickly
    • Approach: Consider writing your own approach.
      • maybe numpy.unique to find the (non-zero) label integers
      • strip out zero value and loop through the unique label ids
      • find bounds
  • How can I merge the dictionaries without doing unecessary copying / work to create new dicts
  • How do we know what the right partiion size is for the dataframes?
    • Approach: run a reasonable sized computation while looking at the dashboard.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment