Skip to content

Instantly share code, notes, and snippets.

@tacaswell
Last active Apr 2, 2020
Embed
What would you like to do?
ingest xpd chi files

This is code to consume processed data from XPD back into document model

sources:
xpd_auto_202003_mongo:
driver: bluesky-mongo-normalized-catalog
args:
metadatastore_db: mongodb://localhost/xpd_auto_202003_mds
asset_registry_db: mongodb://localhost/xpd_auto_202003_ar
xpd_auto_202003_msgpack:
driver: bluesky-msgpack-catalog
args:
paths:
- "/SOMEPATH/export/*.msgpack"
from pathlib import Path
import zipfile
import pandas
import datetime
import numpy as np
from event_model import compose_run
from collections import defaultdict
from databroker import catalog
root_path = Path("/SOMEPATH/2020-03_xpd_autonomous/")
chi_zip = root_path / "Beamtime Reduction in Q space-20200331T172054Z-001.zip"
excel_path = root_path / "20191125_XPD.xlsx"
def parse_name(inp):
# special case the empty sample position
if "empty" in inp:
return None
# the sample name can have 3 or 4 _
try:
comp, temp, time, batch, coord_number = inp.split("_")
except ValueError:
try:
comp, pristene, batch, coord_number = inp.split("_")
# if the sample is "pristene" default to room temperature and 0 cooking
if pristene == "Pristine":
temp = "25C"
time = "0min"
else:
return None
except ValueError:
return None
# TODO check the post fixes are correct
time = float(time.replace("p", ".")[:-3])
temp = float(temp[:-1])
comp = {comp[:2]: int(comp[2:4]), comp[4:6]: int(comp[6:])}
return {
**comp,
"temp": temp,
"anneal_time": time,
"batch": tuple(map(int, batch.split("-"))),
"position": tuple(map(int, coord_number.split("-"))),
}
df = pandas.read_excel(excel_path)
sample_names = set(df["sample"])
data = defaultdict(
lambda: {
"start": None,
"descriptors": defaultdict(list),
"event_pages": defaultdict(list),
"stop": None,
}
)
with zipfile.ZipFile(chi_zip) as zipin:
for n in zipin.namelist():
n = Path(n)
_, fname = n.parts
for sn in sample_names:
if fname.startswith(sn):
break
else:
raise ValueError
fname = fname[len(sn) + 1 :]
date_str, _, fname = fname.partition("_")
dt = datetime.datetime.strptime(date_str, "%Y%m%d-%H%M%S")
ts = dt.timestamp()
uid, _, fname = fname.partition("_")
num, _, fname = fname.partition("_")
res, _, ext = fname.partition(".")
md = {
"source_uid": uid,
"iso_time": dt.isoformat(),
"result_type": res,
"result_number": int(num),
"original_path": str(n),
"sample_name": sn,
**parse_name(sn),
}
fin = iter(zipin.read(str(n)).split(b"\n"))
# grab first line of the header which includes in interesting looking path
md["chi_header"] = next(fin).decode("ascii")
# skip 5 lines of comments for humans
for j in range(5):
next(fin)
_, _, num = next(fin).partition(b":")
num = int(num)
md["num_bins"] = num
Q = np.zeros(num)
I = np.zeros(num)
# skip line of comments
next(fin)
for k, ln in enumerate(fin):
if ln.strip():
Q[k], I[k] = map(float, ln.split(b" "))
assert k == num
# first, compose the run start and get the rest of the factories
run_bundle = compose_run(time=ts, metadata=md)
# stash the start document for later, could also just serialize
data[uid]["start"] = run_bundle.start_doc
# create the descriptor and factories
desc_bundle = run_bundle.compose_descriptor(
name="primary",
data_keys={
"Q": {
"dtype": "number",
"shape": [],
"source": "compute",
"units": "angstrom",
},
"I": {
"dtype": "number",
"shape": [],
"source": "compute",
"units": "arbitrarily",
},
},
)
# stash the descriptor for later use
desc = desc_bundle.descriptor_doc
data[uid]["descriptors"][desc["name"]].append(desc)
# construct and event page
evnt_page = desc_bundle.compose_event_page(
data={"Q": Q, "I": I},
# we might make timestamps optional in the future
timestamps={"Q": np.ones_like(Q) * ts, "I": np.ones_like(I) * ts},
seq_num=list(range(len(Q))),
time=np.ones_like(Q) * ts,
# there is a PR to fix this in event-model the problem is
# that it is that jsonschema is not identifying that numpy
# arrays are "array" from a validation point of view 🤦
validate=False,
)
data[uid]["event_pages"][desc["uid"]].append(evnt_page)
data[uid]["stop"] = run_bundle.compose_stop()
# grab a reference to the catalog (backed by msgpack files)
db = catalog['xpd_auto_202003_msgpack']
# short-cut to get the correct suitcase configured to write where
# the catalog reads from
serialize = db.v1.insert
# loop through the dictionary we built above
for v in data.values():
# insert the start
serialize('start', v['start'])
# for each stream insert the descriptors
# there can be more than one stream
for stream, descs in v['descriptors'].items():
# and each stream can have more than one descriptor
for d in descs:
serialize('descriptor', d)
# loop through and insert all of the event pages
for k, event_pages in v['event_pages'].items():
for page in event_pages:
serialize('event_page', page)
# insert the stop document
serialize('stop', v['stop'])
# # get all of the uids
# uids = list(db.search({}))
#
# # pull _all_ of the RunCatalogs
# data = [db[uid] for uid in uids]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment