Skip to content

Instantly share code, notes, and snippets.

@danielballan
Last active October 29, 2020 18:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danielballan/eb42d371762aa035c2bc0c277153843c to your computer and use it in GitHub Desktop.
Save danielballan/eb42d371762aa035c2bc0c277153843c to your computer and use it in GitHub Desktop.
Patch databroker to enable reading PDF TIFFs lazily
from databroker import Broker
db = Broker.named('pdf')
# Ooverride the dask-coersion code with an improved version
# that deals with the incomplete or wrong shape metadata from ophyd.
# This is proposed to be added to databroker in
# https://github.com/bluesky/databroker/pull/596/.
import dask
import dask.array
import event_model
def coerce_dask(handler_class, filler_state):
# If the handler has its own delayed logic, defer to that.
if hasattr(handler_class, 'return_type'):
if handler_class.return_type['delayed']:
return handler_class
# Otherwise, provide best-effort dask support by wrapping each datum
# payload in dask.array.from_delayed. This means that each datum will be
# one dask task---it cannot be rechunked into multiple tasks---but that
# may be sufficient for many handlers.
class Subclass(handler_class):
def __call__(self, *args, **kwargs):
descriptor = filler_state.descriptor
key = filler_state.key
shape = extract_shape(descriptor, key, filler_state.resource)
# there is an un-determined size (-1) in the shape, abandon
# lazy as it will not work
if any(s <= 0 for s in shape):
return dask.array.from_array(super().__call__(*args, **kwargs))
else:
dtype = extract_dtype(descriptor, key)
load_chunk = dask.delayed(super().__call__)(*args, **kwargs)
return dask.array.from_delayed(load_chunk, shape=shape, dtype=dtype)
return Subclass
# This adds a 'delayed' option to event_model.Filler's `coerce` parameter.
# By adding it via plugin, we avoid adding a dask.array dependency to
# event-model and we keep the fiddly hacks into extract_shape here in
# databroker, a faster-moving and less fundamental library than event-model.
event_model.register_coercion('delayed', coerce_dask, overwrite=True)
def extract_shape(descriptor, key, resource=None):
"""
Patch up misreported 'shape' metadata in old documents.
This uses heuristcs to guess if the shape looks wrong and determine the
right one. Once historical data has been fixed, this function will will be
reused to::
return descriptor['data_keys'][key]['shape']
"""
data_key = descriptor['data_keys'][key]
if resource is not None:
if "frame_per_point" in resource.get("resource_kwargs", {}):
# This is a strong signal that the correct num_images value is here.
num_images = resource["resource_kwargs"]["frame_per_point"]
else:
# Otherwise try to find something ending in 'num_images' in the
# configuration dict associated with this device.
object_keys = descriptor.get('object_keys', {})
for object_name, data_keys in object_keys.items():
if key in data_keys:
break
else:
raise RuntimeError(f"Could not figure out shape of {key}")
for k, v in descriptor['configuration'][object_name]['data'].items():
if k.endswith('num_images'):
num_images = v
break
else:
num_images = -1
# Work around bug in https://github.com/bluesky/ophyd/pull/746
# Broken ophyd reports (x, y, 0). We want (num_images, y, x).
if len(data_key['shape']) == 3 and data_key['shape'][-1] == 0:
x, y, _ = data_key['shape']
shape = (num_images, y, x)
else:
shape = data_key['shape']
if num_images == -1:
# Along this path, we have no way to make dask work. The calling code
# will fall back to numpy.
return shape
else:
# Along this path, we have inferred that a -1 in here is num_images,
# extracted based on inspecting the resource or the descriptor,
# and we should replace -1 with that.
shape_ = []
for item in shape:
if item == -1:
shape_.append(num_images)
else:
shape_.append(item)
return shape_
def extract_dtype(descriptor, key):
"""
Work around the fact that we currently report jsonschema data types.
"""
reported = descriptor['data_keys'][key]['dtype']
if reported == 'array':
return float # guess!
else:
return reported
# Example usage:
uid = 'b135a3b2-811e-48d3-97b9-b2e01bb5e609'
h = db.v2[uid]
h.primary.to_dask()["pe1c_image"]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment