|
from databroker.core import extract_dtype |
|
import dask |
|
import dask.array |
|
import event_model |
|
from event_model import UnresolvableForeignKeyError, DataNotAccessible, MismatchedDataKeys, _attempt_with_retries |
|
import copy |
|
|
|
def coerce_dask(handler_class, filler_state): |
|
# If the handler has its own delayed logic, defer to that. |
|
if hasattr(handler_class, 'return_type'): |
|
if handler_class.return_type['delayed']: |
|
return handler_class |
|
|
|
# Otherwise, provide best-effort dask support by wrapping each datum |
|
# payload in dask.array.from_delayed. This means that each datum will be |
|
# one dask task---it cannot be rechunked into multiple tasks---but that |
|
# may be sufficient for many handlers. |
|
class Subclass(handler_class): |
|
|
|
def __call__(self, *args, **kwargs): |
|
descriptor = filler_state.descriptor |
|
key = filler_state.key |
|
shape = extract_shape(descriptor, key, filler_state.resource) |
|
dtype = extract_dtype(descriptor, key) |
|
load_chunk = dask.delayed(super().__call__)(*args, **kwargs) |
|
return dask.array.from_delayed(load_chunk, shape=shape, dtype=dtype) |
|
|
|
return Subclass |
|
|
|
|
|
# This adds a 'delayed' option to event_model.Filler's `coerce` parameter. |
|
# By adding it via plugin, we avoid adding a dask.array dependency to |
|
# event-model and we keep the fiddly hacks into extract_shape here in |
|
# databroker, a faster-moving and less fundamental library than event-model. |
|
event_model.register_coersion('delayed', coerce_dask, overwrite=True) |
|
|
|
|
|
def extract_shape(descriptor, key, resource=None): |
|
""" |
|
Work around bug in https://github.com/bluesky/ophyd/pull/746 |
|
""" |
|
# Ideally this code would just be |
|
# descriptor['data_keys'][key]['shape'] |
|
# but we have to do some heuristics to make up for errors in the reporting. |
|
|
|
# Broken ophyd reports (x, y, 0). We want (num_images, y, x). |
|
data_key = descriptor['data_keys'][key] |
|
if resource is not None: |
|
if "frame_per_point" in resource.get("resource_kwargs", {}): |
|
num_images = resource["resource_kwargs"]["frame_per_point"] |
|
else: |
|
object_keys = descriptor.get('object_keys', {}) |
|
for object_name, data_keys in object_keys.items(): |
|
if key in data_keys: |
|
break |
|
else: |
|
raise RuntimeError(f"Could not figure out shape of {key}") |
|
for k, v in descriptor['configuration'][object_name]['data'].items(): |
|
if k.endswith('num_images'): |
|
num_images = v |
|
break |
|
else: |
|
num_images = -1 |
|
if len(data_key['shape']) == 3 and data_key['shape'][-1] == 0: |
|
x, y, _ = data_key['shape'] |
|
shape = (num_images, y, x) |
|
else: |
|
shape = descriptor['data_keys'][key]['shape'] |
|
shape_ = [] |
|
for item in shape: |
|
if item == -1: |
|
shape_.append(num_images) |
|
else: |
|
shape_.append(item) |
|
return shape_ |
|
|
|
|
|
class CustomFiller(event_model.Filler): |
|
def fill_event(self, doc, include=None, exclude=None, inplace=None): |
|
if inplace is None: |
|
inplace = self._inplace |
|
if inplace: |
|
filled_doc = doc |
|
else: |
|
filled_doc = copy.deepcopy(doc) |
|
descriptor = self._descriptor_cache[doc['descriptor']] |
|
from_datakeys = False |
|
self._current_state.descriptor = descriptor |
|
try: |
|
needs_filling = {key for key, val in doc['filled'].items() |
|
if val is False} |
|
except KeyError: |
|
# This document is not telling us which, if any, keys are filled. |
|
# Infer that none of the external data is filled. |
|
needs_filling = {key for key, val in descriptor['data_keys'].items() |
|
if 'external' in val} |
|
from_datakeys = True |
|
for key in needs_filling: |
|
if exclude is not None and key in exclude: |
|
continue |
|
if include is not None and key not in include: |
|
continue |
|
try: |
|
datum_id = doc['data'][key] |
|
except KeyError as err: |
|
if from_datakeys: |
|
raise MismatchedDataKeys( |
|
"The documents are not valid. Either because they " |
|
"were recorded incorrectly in the first place, " |
|
"corrupted since, or exercising a yet-undiscovered " |
|
"bug in a reader. event['data'].keys() " |
|
"must equal descriptor['data_keys'].keys(). " |
|
f"event['data'].keys(): {doc['data'].keys()}, " |
|
"descriptor['data_keys'].keys(): " |
|
f"{descriptor['data_keys'].keys()}") from err |
|
else: |
|
raise MismatchedDataKeys( |
|
"The documents are not valid. Either because they " |
|
"were recorded incorrectly in the first place, " |
|
"corrupted since, or exercising a yet-undiscovered " |
|
"bug in a reader. event['filled'].keys() " |
|
"must be a subset of event['data'].keys(). " |
|
f"event['data'].keys(): {doc['data'].keys()}, " |
|
"event['filled'].keys(): " |
|
f"{doc['filled'].keys()}") from err |
|
# Look up the cached Datum doc. |
|
try: |
|
datum_doc = self._datum_cache[datum_id] |
|
except KeyError as err: |
|
raise UnresolvableForeignKeyError( |
|
datum_id, |
|
f"Event with uid {doc['uid']} refers to unknown Datum " |
|
f"datum_id {datum_id}") from err |
|
resource_uid = datum_doc['resource'] |
|
# Look up the cached Resource. |
|
try: |
|
resource = self._resource_cache[resource_uid] |
|
except KeyError as err: |
|
raise UnresolvableForeignKeyError( |
|
resource_uid, |
|
f"Datum with id {datum_id} refers to unknown Resource " |
|
f"uid {resource_uid}") from err |
|
self._current_state.key = key |
|
self._current_state.resource = resource |
|
self._current_state.datum = datum_doc |
|
handler = self._get_handler_maybe_cached(resource) |
|
error_to_raise = DataNotAccessible( |
|
f"Filler was unable to load the data referenced by " |
|
f"the Datum document {datum_doc} and the Resource " |
|
f"document {resource}.") |
|
payload = _attempt_with_retries( |
|
func=handler, |
|
args=(), |
|
kwargs=datum_doc['datum_kwargs'], |
|
intervals=[0] + self.retry_intervals, |
|
error_to_catch=IOError, |
|
error_to_raise=error_to_raise) |
|
# Here we are intentionally modifying doc in place. |
|
filled_doc['data'][key] = payload |
|
filled_doc['filled'][key] = datum_id |
|
self._current_state.key = None |
|
self._current_state.descriptor = None |
|
self._current_state.resource = None |
|
self._current_state.datum = None |
|
return filled_doc |