Skip to content

Instantly share code, notes, and snippets.

@danielballan
Last active September 24, 2020 21:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danielballan/98ba5d886f7424b6f3972ad953d3bcd5 to your computer and use it in GitHub Desktop.
Save danielballan/98ba5d886f7424b6f3972ad953d3bcd5 to your computer and use it in GitHub Desktop.
Excluding fields from a suitcase Serializer
import copy
from event_model import DocumentRouter, RunRouter
class Selector(DocumentRouter):
def __init__(self, exclude=None, **kwargs):
self._exclude = exclude or []
super().__init__(**kwargs)
def start(self, doc):
self.emit('start', doc)
def descriptor(self, doc):
edited = copy.deepcopy(doc)
for key in self._exclude:
edited['data_keys'].pop(key, None)
self.emit('descriptor', edited)
def event(self, doc):
edited = copy.deepcopy(doc)
for key in self._exclude:
edited['data'].pop(key, None)
edited['timestamps'].pop(key, None)
edited.get('filled', {}).pop(key, None)
self.emit('event', edited)
def stop(self, doc):
self.emit('stop', doc)
def emit(self, name, doc):
super().emit(name, doc)
# Test
from suitcase.tiff_series import Serializer
handler_registry = databroker.core.discover_handlers()
def factory1(name, doc):
"This is expected to write TIFFs."
serializer = Serializer('test_without_selector')
return [serializer], []
def factory2(name, doc):
"This is expected to write TIFFs."
serializer = Serializer('test_with_selector_empty')
selector = Selector(exclude=[], emit=serializer)
selector._stashed_hard_ref_to_serializer = serializer
return [selector], []
def factory3(name, doc):
"This is NOT expected to write TIFFs."
serializer = Serializer('test_with_selector_employed')
selector = Selector(exclude=["img"], emit=serializer)
selector._stashed_hard_ref_to_serializer = serializer
return [selector], []
import databroker
from ophyd.sim import img as detector1, direct_img as detector2
from bluesky import RunEngine
from bluesky.plans import count
rr = RunRouter([factory1, factory2, factory3])
RE = RunEngine()
RE.subscribe(rr)
RE(count([detector1]))
RE(count([detector2]))
RE(count([detector1, detector2]))
# Now the directory test_without_selector contains TIFFs. The directory test_without_selector isn't even created.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment