Replay documents from a bluesky run with realistic time spacing.
This example acquires data with bluesky, saves the documents to disk, accesses
the data using databroker, and finally uses replay
to push the documents into
a LiveTable
callback.
We will use caproto-shark to analyze CA network traffic. Note that the actual servers and clients involve may or may not be using caproto themselves; it does not matter.
Install caproto and pandas if they are not already installed.
pip install caproto[standard] pandas
from bluesky_widgets.models.auto_plot_builders import AutoPlotter | |
from bluesky_widgets.models.plot_builders import Lines | |
from bluesky_widgets.models.plot_specs import AxesSpec, FigureSpec | |
from bluesky_widgets.qt.figures import QtFigures | |
import databroker | |
import numpy as np | |
def xanes(monitor, detector): | |
absorption = np.log(np.array(monitor) / np.array(detector)).reshape(-1, 4) |
from ophyd.epics_motor import EpicsMotor, required_for_connection, motor_done_move, AlarmSeverity | |
PatchedEpicsMotor(EpicsMotor): | |
@required_for_connection | |
@motor_done_move.sub_value | |
def _move_changed(self, timestamp=None, value=None, sub_type=None, | |
**kwargs): | |
'''Callback from EPICS, indicating that movement status has changed''' | |
was_moving = self._moving |
from ophyd.sim import img as detector | |
from bluesky.plan_stubs import open_run, stage, unstage, close_run, trigger_and_read, subscribe, unsubscribe | |
from bluesky_live.bluesky_run import BlueskyRun, DocumentCache | |
def plan(): | |
""" | |
Take 10 shots with a detector and do some data processing on the result. | |
Key points |
from databroker import Broker | |
db = Broker.named('pdf') | |
# Ooverride the dask-coersion code with an improved version | |
# that deals with the incomplete or wrong shape metadata from ophyd. | |
# This is proposed to be added to databroker in | |
# https://github.com/bluesky/databroker/pull/596/. | |
import dask | |
import dask.array |
import copy | |
from event_model import DocumentRouter, RunRouter | |
class Selector(DocumentRouter): | |
def __init__(self, exclude=None, **kwargs): | |
self._exclude = exclude or [] | |
super().__init__(**kwargs) | |
def start(self, doc): |