Skip to content

Instantly share code, notes, and snippets.

@tacaswell
Last active February 18, 2021 19:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tacaswell/4ff2365a6bb39273b627e7b1da2614a8 to your computer and use it in GitHub Desktop.
Save tacaswell/4ff2365a6bb39273b627e7b1da2614a8 to your computer and use it in GitHub Desktop.
Files for working with reduced data from 2020-12 experiments at XPD

Collection of notebooks an files for playing with gpcam data

  1. explore_pgcam_data.ipynb -> demo of how to pulll reduced data out of the msgpack databroker
  2. extract.py -> code run on xpd against raw data broker to sort out what raw runs we are interested in
  3. reprocess.py -> code run to re-process the raw data and produce the reduced outputs
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
from databroker import Broker
from databroker.queries import TimeRange
from databroker._drivers.msgpack import BlueskyMsgpackCatalog
import datetime
import itertools
from collections import defaultdict
import json
def summerize(by_sample):
tmp = []
for k, v in by_sample.items():
times = [h.metadata["start"]["time"] for h in v]
start_time = min(times)
stop_time = max(times)
tmp.append(
(
start_time,
f"{k!r:<25}:{len(v):<4} "
+ f"{datetime.datetime.fromtimestamp(start_time)} - "
+ f"{datetime.datetime.fromtimestamp(stop_time)}",
)
)
for _, pp in sorted(tmp):
print(pp)
def groupby_sample(hdrs):
ret = defaultdict(list)
for h in hdrs:
ret[h.metadata["start"].get("sample_name", None)].append(h)
return ret
def by_run_phase(by_sample, groups):
out = {}
for k, v in groups.items():
out[k] = list(itertools.chain(*(by_sample[s] for s in v)))
return out
def by_reduced_status(by_phase, has_reduction):
out = {}
for k, v in by_phase.items():
wd = out[k] = {"needs": [], "has": []}
for h in v:
if h.metadata["start"]["uid"] in has_reduction:
wd["has"].append(h)
else:
wd["needs"].append(h)
return out
db = Broker.named("xpd")
cat = db.v2
tr = TimeRange(since="2020-12-07 19:00", until="2020-12-12 12:00")
hdrs = list(cat[d] for d in cat.search(tr).search({"bt_uid": "98812e29"}))
by_sample = groupby_sample(hdrs)
print({k: len(v) for k, v in by_sample.items()})
ticumg_data = {k: v for k, v in by_sample.items() if k is not None and "TiCuMg" in k}
by_phase = by_run_phase(
ticumg_data,
{
"grid": ["TiCuMg_alloy", "TiCuMg_alloy_2", "TiCuMg_alloy_3"],
"gpcam": ["TiCuMg_alloy_auto_1"],
"xca": ["TiCuMg_alloy_adapt"],
},
)
print({k: len(v) for k, v in by_phase.items()})
local_cat = BlueskyMsgpackCatalog(["reduced/*.msgpack"])
have_reduced = set(
local_cat[d].metadata["start"]["original_start_uid"] for d in local_cat.search({})
)
work_plan = by_reduced_status(by_phase, have_reduced)
print({k: {_k: len(_v) for _k, _v in v.items()} for k, v in work_plan.items()})
just_uids = {k: {_k: [_h.metadata['start']['uid'] for _h in _v] for _k, _v in v.items()} for k, v in work_plan.items()}
with open(Path('~/work_plan.json').expanduser(), 'w') as fout:
json.dump(just_uids, fout)
# len(have_reduced)
# have_reduced - data_uids
# len(have_reduced - data_uids)
# len(data_uids - have_reduced)
# to_export = list(data_uids - have_reduced)
# to_export
# len(data_uids | have_reduced)
# len(data_uids & have_reduced)
# len(to_export)
# just_copy = data_uids - have_reduced
# import itertools
# list(itertools.chain(*(local_cat.search({"original_start_uid": d}) for d in just_copy)))
# just_copy
# local_cat.search({"oiginal_start_uid": "05006b0d-e55b-4e9c-b359-e59aee82bad0"})
# list(local_cat.search({"oiginal_start_uid": "05006b0d-e55b-4e9c-b359-e59aee82bad0"}))
# have_reduced
# list(local_cat.search({"oiginal_start_uid": "ffe934b2-46ec-4607-993c-dbe509447c1a"}))
# list(local_cat.search(oiginal_start_uid="ffe934b2-46ec-4607-993c-dbe509447c1a"))
# reduced_headers = [local_cat[d] for d in local_cat.search({})]
# just_export_headers = [
# h for h in reduced_headers if h["original_start_uid"] in just_export
# ]
# just_export_headers = [
# h
# for h in reduced_headers
# if h.metadata["start"]["original_start_uid"] in just_export
# ]
# just_export_headers = [
# h for h in reduced_headers if h.metadata["start"]["original_start_uid"] in just_copy
# ]
# len(just_export_headers)
# just_copy = data_uids & have_reduced
# list(itertools.chain(*(local_cat.search({"original_start_uid": d}) for d in just_copy)))
# just_copy_reduced_uids = list(
# itertools.chain(*(local_cat.search({"original_start_uid": d}) for d in just_copy))
# )
# len(local_cat)
# len(res)
# res = list(cat.search(dict(bt_safN="306612")))
# len(res)
"""Run data reduction in-line
"""
import copy
from pathlib import Path
import json
from functools import partial
import itertools
from event_model import RunRouter as RealRunRouter
from event_model import unpack_datum_page, unpack_event_page
from rapidz import Stream, move_to_first
from rapidz.link import link
from shed import SimpleToEventStream
from suitcase.msgpack import Serializer
from xpdan.pipelines.main import (
image_process,
calibration,
clear_geo_gen,
scattering_correction,
gen_mask,
integration,
clear_comp,
start_gen,
pdf_gen,
)
from xpdan.pipelines.to_event_model import pipeline_order as tem_pipeline_order
from xpdan.pipelines.to_event_model import to_event_stream_with_ind
from xpdan.vend.callbacks.core import RunRouter, StripDepVar
from databroker.v0 import Broker
db = Broker.named("xpd")
pipeline_order = [
partial(start_gen, db=db, image_names=["pe2_image", "pe1_image"]),
image_process,
calibration,
clear_geo_gen,
scattering_correction,
gen_mask,
integration,
pdf_gen,
clear_comp,
]
order = pipeline_order + tem_pipeline_order
def create_analysis_pipeline(
*, order, publisher, stage_blacklist=(), **kwargs,
):
"""Create the analysis pipeline from an list of chunks and pipeline kwargs
Parameters
----------
order : list of functions
The list of pipeline chunk functions
kwargs : Any
The kwargs to pass to the pipeline creation
Returns
-------
namespace : dict
The namespace of the pipeline
"""
namespace = link(*order, raw_source=Stream(stream_name="raw source"), **kwargs)
source = namespace["source"]
# do inspection of pipeline for ToEventModel nodes, maybe?
# for analyzed data with independent data (vis and save)
# strip the dependant vars form the raw data
raw_stripped = move_to_first(source.starmap(StripDepVar()))
namespace.update(
to_event_stream_with_ind(
raw_stripped,
*[
node
for node in namespace.values()
if isinstance(node, SimpleToEventStream)
and node.md.get("analysis_stage", None) not in stage_blacklist
],
publisher=publisher,
)
)
return namespace
def diffraction_router(start, diffraction_dets, xrd_namespace):
# This does not support concurrent radiograms and diffractograms
# If there are diffraction detectors in the list, this is diffraction
if any(d in diffraction_dets for d in start["detectors"]):
print("analyzing as diffraction")
return lambda *x: xrd_namespace["raw_source"].emit(x)
def create_analysis_pipeline(
order, stage_blacklist=(), *, publisher, **kwargs,
):
"""Create the analysis pipeline from an list of chunks and pipeline kwargs
Parameters
----------
order : list of functions
The list of pipeline chunk functions
kwargs : Any
The kwargs to pass to the pipeline creation
Returns
-------
namespace : dict
The namespace of the pipeline
"""
namespace = link(*order, raw_source=Stream(stream_name="raw source"), **kwargs)
source = namespace["source"]
# do inspection of pipeline for ToEventModel nodes, maybe?
# for analyzed data with independent data (vis and save)
# strip the dependant vars form the raw data
raw_stripped = move_to_first(source.starmap(StripDepVar()))
namespace.update(
to_event_stream_with_ind(
raw_stripped,
*[
node
for node in namespace.values()
if isinstance(node, SimpleToEventStream)
and node.md.get("analysis_stage", None) not in stage_blacklist
],
publisher=publisher,
)
)
return namespace
def run_processor(
order, *, db, diffraction_dets, stage_blacklist=(), publisher=None, **kwargs,
):
print(kwargs)
db.prepare_hook = lambda x, y: copy.deepcopy(y)
rr = RunRouter(
[diffraction_router],
xrd_namespace=create_analysis_pipeline(
order=order,
stage_blacklist=stage_blacklist,
publisher=publisher,
**kwargs,
db=db,
),
diffraction_dets=diffraction_dets,
)
return rr
def filter_factory(base_name):
def filter(name, doc):
if doc["analysis_stage"] == "integration":
print("got one!")
return (
[Serializer(Path("~").expanduser() / "adaptive_reduced" / base_name)],
[],
)
return [], []
return filter
with open(Path("~/work_plan.json").expanduser(), "r") as fin:
just_uids = json.load(fin)
for k, v in just_uids.items():
filter = filter_factory(k)
outputter = RealRunRouter([filter])
rr = run_processor(
order=order, db=db, diffraction_dets=["pe1", "pe2"], publisher=outputter
)
for uid in itertools.chain(*v.values()):
h = db[uid]
for name, doc in h.documents():
doc = dict(doc)
if name == "datum_page":
for d in unpack_datum_page(doc):
rr("datum", d)
elif name == "event_page":
for d in unpack_event_page(doc):
rr("event", d)
else:
rr(name, doc)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment