Skip to content

Instantly share code, notes, and snippets.

@danielballan
Last active May 17, 2021 18:59
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danielballan/516e0b0183f424a9783a57b056ae516c to your computer and use it in GitHub Desktop.
Save danielballan/516e0b0183f424a9783a57b056ae516c to your computer and use it in GitHub Desktop.
replay

Replay

Replay documents from a bluesky run with realistic time spacing.

Example

This example acquires data with bluesky, saves the documents to disk, accesses the data using databroker, and finally uses replay to push the documents into a LiveTable callback.

$ pip install -r requirements.txt
$ python example.py 
+-----------+------------+------------+
|   seq_num |       time |        det |
+-----------+------------+------------+
|         1 | 08:51:53.8 |      1.000 |
|         2 | 08:51:54.8 |      1.000 |
|         3 | 08:51:56.8 |      1.000 |
|         4 | 08:51:59.8 |      1.000 |
|         5 | 08:52:03.8 |      1.000 |
+-----------+------------+------------+
generator count ['c893bfaa'] (scan num: 1)
import tempfile
from bluesky import RunEngine
from bluesky.plans import count
from ophyd.sim import det, img
from suitcase.msgpack import Serializer
# Take sample data.
RE = RunEngine()
data_dir = tempfile.TemporaryDirectory().name
with Serializer(data_dir) as serializer:
# Take five exposures with a linearly increasing
# time delay between them.
# Emit the documents to a Serializer that writes msgpack.
# Use an array detector with externally-stored files (img) and a simple
# scalar detector (det).
RE(count([img, det], num=5, delay=(1, 2, 3, 4)), serializer)
# Replay sample data.
from replay import replay
from event_model import DocumentRouter
from bluesky.callbacks import LiveTable as LiveTable_
from databroker._drivers.msgpack import BlueskyMsgpackCatalog
class LiveTable(LiveTable_, DocumentRouter):
"""
Patch up LiveTable to make it aware of the paginated types, event_page and
datum_page. The upcoming release of bluesky will fix this.
"""
pass
catalog = BlueskyMsgpackCatalog(f"{data_dir}/*.msgpack")
run = catalog[-1]
replay(run.canonical(fill="no"), LiveTable(["motor", "det"]))
#!/usr/bin/env python3
import argparse
import collections
import sys
import time
from event_model import unpack_event_page, unpack_datum_page
def log(*args, **kwargs):
kwargs.setdefault("file", sys.stderr)
return print(*args, **kwargs)
def replay(gen, callback, burst=False, delay=0):
"""
Emit documents to a callback with realistic time spacing.
Parameters
----------
gen: iterable
Expected to yield (name, doc) pairs
callback: callable
Expected signature: callback(name, doc)
"""
cache = collections.deque()
name, doc = next(gen)
if name != "start":
raise ValueError("Expected gen to start with a RunStart document")
# Compute time difference between now and the time that this run started.
offset = time.time() - doc["time"]
callback(name, doc)
for name, doc in gen:
if name == "event_page":
# Expand this EventPage into Events.
for event in unpack_event_page(doc):
_process_document("event", event, cache, offset, callback, burst, delay)
elif name == "datum_page":
# Expand this DatumgPage into Events.
for datum in unpack_datum_page(doc):
_process_document("datum", datum, cache, offset, callback, burst, delay)
else:
_process_document(name, doc, cache, offset, callback, burst, delay)
_DOCUMENTS_WITHOUT_A_TIME = {"datum", "datum_page", "resource"}
def _process_document(name, doc, cache, offset, callback, burst, delay):
if name in _DOCUMENTS_WITHOUT_A_TIME:
# The bluesky RunEngine emits these documents immediately
# before emitting an Event, which does have a time. Lacking
# more specific timing info, we'll cache these and then emit
# them in a burst before the next document with an associated time.
cache.append((name, doc))
else:
if not burst:
delay = max(0, offset - (time.time() - doc["time"]))
time.sleep(delay)
while cache:
# Emit any cached documents without a time in a burst.
time.sleep(delay)
callback(*cache.popleft())
# Emit this document.
time.sleep(delay)
callback(name, doc)
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--zmq", help="0MQ address")
parser.add_argument(
"--kafka-bootstrap-servers",
help="Kafka servers, comma-separated string, e.g. "
"kafka1.nsls2.bnl.gov:9092,kafka2.nsls2.bnl.gov:9092,kafka3.nsls2.bnl.gov:9092",
)
parser.add_argument(
"--kafka-topic", help="Kafka topic, e.g. bmm.bluesky.runengine.documents"
)
parser.add_argument("--kafka-key", help="Kafka key")
catalog_group = parser.add_argument_group(
description=("Which catalog should we look for Runs in?")
).add_mutually_exclusive_group()
catalog_group.add_argument("--catalog", help="Databroker catalog name")
catalog_group.add_argument("--msgpack", help="Directory of msgpack files")
filter_group = parser.add_argument_group(
description="Which Runs should we replay? Must specify one of these:"
).add_mutually_exclusive_group()
filter_group.add_argument(
"--all", action="store_true", help="Replay every Run in this Catalog."
)
filter_group.add_argument(
"-q",
"--query",
type=str,
action="append",
help=(
"MongoDB-style query or databroker.queries Query. "
"Narrow results by chaining multiple queries like "
"-q \"TimeRange(since='2020')\" "
"-q \"{'sample': 'Au'}\""
),
)
filter_group.add_argument(
"--uids",
type=argparse.FileType("r"),
action="append",
help=("Newline-separated (partial) uids. Lines starting with # are skipped."),
)
parser.add_argument(
"--burst",
action="store_true",
help=(
"Set this to turn off simulated timing and push documents "
"through as fast as possible."
),
)
parser.add_argument(
"--delay",
type=float,
default=0,
help=("Add a constant delay between documents."),
)
args = parser.parse_args(argv)
if args.zmq:
from bluesky.callbacks.zmq import Publisher
publisher = Publisher(args.zmq)
elif args.kafka_bootstrap_servers and args.kafka_topics:
from bluesky_kafka import Publisher
bootstrap_servers = [s.strip() for s in args.kafka_bootstrap_servers.split(",")]
publisher = Publisher(args.kafka_topic, bootstrap_servers, args.kafka_key)
else:
raise ValueError("Either --zmq or --kafka... parameter is required.")
if args.msgpack:
from databroker._drivers.msgpack import BlueskyMsgpackCatalog
catalog = BlueskyMsgpackCatalog([args.msgpack])
elif args.catalog:
import databroker
catalog = databroker.catalog[args.catalog]
if args.query or args.all:
import databroker.queries
if args.all:
# --all is an alias for --query "{}"
raw_queries = ["{}"]
else:
raw_queries = args.query
queries = []
ns = vars(databroker.queries)
for raw_query in raw_queries:
# Parse string like "{'scan_id': 123}" to dict.
try:
query = dict(eval(raw_query, ns))
except Exception:
raise ValueError(f"Could not parse query {raw_query}.")
queries.append(query)
combined_query = {"$and": queries}
# HACK We need no_cursor_timeout only until databroker learns to
# seamlessly remake cursors when they time out.
try:
results = catalog.search(combined_query)
except TypeError:
# Drivers that are not Mongo drivers will not support
# no_cursor_timeout.
results = catalog.search(combined_query)
if not results:
print(f"Query {combined_query} yielded no results. Exiting.")
# This can be a result of bash mangling the query beacuse the
# caller forgot to escape the $'s.
print(
"If your query had dollar signs in it, remember to "
"escape them like \\$."
)
sys.exit(1)
elif args.uids:
# Skip blank lines and commented lines.
uids = []
for uid_file in args.uids:
uids.extend(
line.strip()
for line in uid_file.read().splitlines()
if line and not line.startswith("#")
)
if not uids:
print("Found empty input for --uids. Exiting")
sys.exit(1)
results = {uid: catalog[uid] for uid in uids}
else:
parser.error(
"Must specify which Runs to replay via --query ... or "
"--uids ... or --all."
)
log(f"Replaying {len(results)} runs.")
for uid, run in results.items():
log(f"Replaying run {run}")
replay(run.documents(fill="no"), publisher, burst=args.burst, delay=args.delay)
log(f"Done with run {run}")
log("Complete. Exiting.")
if __name__ == "__main__":
main()
# Note that replay.py itself uses only Python builtins.
# These requirements are needed to run the example.
bluesky
databroker >=1.0.0b6
ophyd
suitcase-msgpack
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment