Skip to content

Instantly share code, notes, and snippets.

@danielballan
Last active August 28, 2020 20:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danielballan/b4fccac29cd77e7bdfd93eba775d9b81 to your computer and use it in GitHub Desktop.
Save danielballan/b4fccac29cd77e7bdfd93eba775d9b81 to your computer and use it in GitHub Desktop.
Use Resource to get at correct shape for dask array
  1. Place this shape_workaround.py from this gist in your current working directory (so Python can import it).
  2. databroker-unpack inplace /nsls2/tmp/export/fxi_karen_2019_05_03_attempt5 fxi_karen_2019_05_03_attempt5
  3. Edit by hand the catalog.yml file that was placed by the databroker-unpack CLI to inject the custom Filler subclass defined in shape_workaround.py.
# ~/.local/share/intake/databroker_unpack_fxi_karen_2019_05_03_attempt5.yml
 sources:
   fxi_karen_2019_05_03_attempt5:
     args:
       paths:
       - /nsls2/tmp/export/fxi_karen_2019_05_03_attempt5/documents/*.jsonl
       root_map:
         f1fdee311b9ea3f07adfc97e23010b41: /nsls2/tmp/export/fxi_karen_2019_05_03_attempt5/external_files/f1fdee311b9ea3f07adfc97e23010b41
+      filler_class: "shape_workaround.CustomFiller"
     driver: bluesky-jsonl-catalog
     metadata:
       generated_by:
         library: databroker_pack
         version: 0.3.0
       relative_paths:
       - ./documents/*.jsonl
  1. Access data.
import databroker
catalog = databroker.catalog["fxi_karen_2019_05_03_attempt5"]
import numpy as np
run = catalog[-1]
xr = run.primary.read()
xr["Andor_image"]  # shape (172, 10, 1080, 1280)
xr["Andor_timestamps"]  # shape (172, 10)
from databroker.core import extract_dtype
import dask
import dask.array
import event_model
from event_model import UnresolvableForeignKeyError, DataNotAccessible, MismatchedDataKeys, _attempt_with_retries
import copy
def coerce_dask(handler_class, filler_state):
# If the handler has its own delayed logic, defer to that.
if hasattr(handler_class, 'return_type'):
if handler_class.return_type['delayed']:
return handler_class
# Otherwise, provide best-effort dask support by wrapping each datum
# payload in dask.array.from_delayed. This means that each datum will be
# one dask task---it cannot be rechunked into multiple tasks---but that
# may be sufficient for many handlers.
class Subclass(handler_class):
def __call__(self, *args, **kwargs):
descriptor = filler_state.descriptor
key = filler_state.key
shape = extract_shape(descriptor, key, filler_state.resource)
dtype = extract_dtype(descriptor, key)
load_chunk = dask.delayed(super().__call__)(*args, **kwargs)
return dask.array.from_delayed(load_chunk, shape=shape, dtype=dtype)
return Subclass
# This adds a 'delayed' option to event_model.Filler's `coerce` parameter.
# By adding it via plugin, we avoid adding a dask.array dependency to
# event-model and we keep the fiddly hacks into extract_shape here in
# databroker, a faster-moving and less fundamental library than event-model.
event_model.register_coersion('delayed', coerce_dask, overwrite=True)
def extract_shape(descriptor, key, resource=None):
"""
Work around bug in https://github.com/bluesky/ophyd/pull/746
"""
# Ideally this code would just be
# descriptor['data_keys'][key]['shape']
# but we have to do some heuristics to make up for errors in the reporting.
# Broken ophyd reports (x, y, 0). We want (num_images, y, x).
data_key = descriptor['data_keys'][key]
if resource is not None:
if "frame_per_point" in resource.get("resource_kwargs", {}):
num_images = resource["resource_kwargs"]["frame_per_point"]
else:
object_keys = descriptor.get('object_keys', {})
for object_name, data_keys in object_keys.items():
if key in data_keys:
break
else:
raise RuntimeError(f"Could not figure out shape of {key}")
for k, v in descriptor['configuration'][object_name]['data'].items():
if k.endswith('num_images'):
num_images = v
break
else:
num_images = -1
if len(data_key['shape']) == 3 and data_key['shape'][-1] == 0:
x, y, _ = data_key['shape']
shape = (num_images, y, x)
else:
shape = descriptor['data_keys'][key]['shape']
shape_ = []
for item in shape:
if item == -1:
shape_.append(num_images)
else:
shape_.append(item)
return shape_
class CustomFiller(event_model.Filler):
def fill_event(self, doc, include=None, exclude=None, inplace=None):
if inplace is None:
inplace = self._inplace
if inplace:
filled_doc = doc
else:
filled_doc = copy.deepcopy(doc)
descriptor = self._descriptor_cache[doc['descriptor']]
from_datakeys = False
self._current_state.descriptor = descriptor
try:
needs_filling = {key for key, val in doc['filled'].items()
if val is False}
except KeyError:
# This document is not telling us which, if any, keys are filled.
# Infer that none of the external data is filled.
needs_filling = {key for key, val in descriptor['data_keys'].items()
if 'external' in val}
from_datakeys = True
for key in needs_filling:
if exclude is not None and key in exclude:
continue
if include is not None and key not in include:
continue
try:
datum_id = doc['data'][key]
except KeyError as err:
if from_datakeys:
raise MismatchedDataKeys(
"The documents are not valid. Either because they "
"were recorded incorrectly in the first place, "
"corrupted since, or exercising a yet-undiscovered "
"bug in a reader. event['data'].keys() "
"must equal descriptor['data_keys'].keys(). "
f"event['data'].keys(): {doc['data'].keys()}, "
"descriptor['data_keys'].keys(): "
f"{descriptor['data_keys'].keys()}") from err
else:
raise MismatchedDataKeys(
"The documents are not valid. Either because they "
"were recorded incorrectly in the first place, "
"corrupted since, or exercising a yet-undiscovered "
"bug in a reader. event['filled'].keys() "
"must be a subset of event['data'].keys(). "
f"event['data'].keys(): {doc['data'].keys()}, "
"event['filled'].keys(): "
f"{doc['filled'].keys()}") from err
# Look up the cached Datum doc.
try:
datum_doc = self._datum_cache[datum_id]
except KeyError as err:
raise UnresolvableForeignKeyError(
datum_id,
f"Event with uid {doc['uid']} refers to unknown Datum "
f"datum_id {datum_id}") from err
resource_uid = datum_doc['resource']
# Look up the cached Resource.
try:
resource = self._resource_cache[resource_uid]
except KeyError as err:
raise UnresolvableForeignKeyError(
resource_uid,
f"Datum with id {datum_id} refers to unknown Resource "
f"uid {resource_uid}") from err
self._current_state.key = key
self._current_state.resource = resource
self._current_state.datum = datum_doc
handler = self._get_handler_maybe_cached(resource)
error_to_raise = DataNotAccessible(
f"Filler was unable to load the data referenced by "
f"the Datum document {datum_doc} and the Resource "
f"document {resource}.")
payload = _attempt_with_retries(
func=handler,
args=(),
kwargs=datum_doc['datum_kwargs'],
intervals=[0] + self.retry_intervals,
error_to_catch=IOError,
error_to_raise=error_to_raise)
# Here we are intentionally modifying doc in place.
filled_doc['data'][key] = payload
filled_doc['filled'][key] = datum_id
self._current_state.key = None
self._current_state.descriptor = None
self._current_state.resource = None
self._current_state.datum = None
return filled_doc
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment