Skip to content

Instantly share code, notes, and snippets.

@tacaswell
Last active April 2, 2020 19:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tacaswell/257d0d712a87e25af8f1b33b3407c84e to your computer and use it in GitHub Desktop.
Save tacaswell/257d0d712a87e25af8f1b33b3407c84e to your computer and use it in GitHub Desktop.
ingest xpd chi files

This is code to consume processed data from XPD back into document model

sources:
xpd_auto_202003_mongo:
driver: bluesky-mongo-normalized-catalog
args:
metadatastore_db: mongodb://localhost/xpd_auto_202003_mds
asset_registry_db: mongodb://localhost/xpd_auto_202003_ar
xpd_auto_202003_msgpack:
driver: bluesky-msgpack-catalog
args:
paths:
- "/SOMEPATH/export/*.msgpack"
from pathlib import Path
import zipfile
import pandas
import datetime
import numpy as np
from event_model import compose_run
from collections import defaultdict
from databroker import catalog
root_path = Path("/SOMEPATH/2020-03_xpd_autonomous/")
chi_zip = root_path / "Beamtime Reduction in Q space-20200331T172054Z-001.zip"
excel_path = root_path / "20191125_XPD.xlsx"
def parse_name(inp):
# special case the empty sample position
if "empty" in inp:
return None
# the sample name can have 3 or 4 _
try:
comp, temp, time, batch, coord_number = inp.split("_")
except ValueError:
try:
comp, pristene, batch, coord_number = inp.split("_")
# if the sample is "pristene" default to room temperature and 0 cooking
if pristene == "Pristine":
temp = "25C"
time = "0min"
else:
return None
except ValueError:
return None
# TODO check the post fixes are correct
time = float(time.replace("p", ".")[:-3])
temp = float(temp[:-1])
comp = {comp[:2]: int(comp[2:4]), comp[4:6]: int(comp[6:])}
return {
**comp,
"temp": temp,
"anneal_time": time,
"batch": tuple(map(int, batch.split("-"))),
"position": tuple(map(int, coord_number.split("-"))),
}
df = pandas.read_excel(excel_path)
sample_names = set(df["sample"])
data = defaultdict(
lambda: {
"start": None,
"descriptors": defaultdict(list),
"event_pages": defaultdict(list),
"stop": None,
}
)
with zipfile.ZipFile(chi_zip) as zipin:
for n in zipin.namelist():
n = Path(n)
_, fname = n.parts
for sn in sample_names:
if fname.startswith(sn):
break
else:
raise ValueError
fname = fname[len(sn) + 1 :]
date_str, _, fname = fname.partition("_")
dt = datetime.datetime.strptime(date_str, "%Y%m%d-%H%M%S")
ts = dt.timestamp()
uid, _, fname = fname.partition("_")
num, _, fname = fname.partition("_")
res, _, ext = fname.partition(".")
md = {
"source_uid": uid,
"iso_time": dt.isoformat(),
"result_type": res,
"result_number": int(num),
"original_path": str(n),
"sample_name": sn,
**parse_name(sn),
}
fin = iter(zipin.read(str(n)).split(b"\n"))
# grab first line of the header which includes in interesting looking path
md["chi_header"] = next(fin).decode("ascii")
# skip 5 lines of comments for humans
for j in range(5):
next(fin)
_, _, num = next(fin).partition(b":")
num = int(num)
md["num_bins"] = num
Q = np.zeros(num)
I = np.zeros(num)
# skip line of comments
next(fin)
for k, ln in enumerate(fin):
if ln.strip():
Q[k], I[k] = map(float, ln.split(b" "))
assert k == num
# first, compose the run start and get the rest of the factories
run_bundle = compose_run(time=ts, metadata=md)
# stash the start document for later, could also just serialize
data[uid]["start"] = run_bundle.start_doc
# create the descriptor and factories
desc_bundle = run_bundle.compose_descriptor(
name="primary",
data_keys={
"Q": {
"dtype": "number",
"shape": [],
"source": "compute",
"units": "angstrom",
},
"I": {
"dtype": "number",
"shape": [],
"source": "compute",
"units": "arbitrarily",
},
},
)
# stash the descriptor for later use
desc = desc_bundle.descriptor_doc
data[uid]["descriptors"][desc["name"]].append(desc)
# construct and event page
evnt_page = desc_bundle.compose_event_page(
data={"Q": Q, "I": I},
# we might make timestamps optional in the future
timestamps={"Q": np.ones_like(Q) * ts, "I": np.ones_like(I) * ts},
seq_num=list(range(len(Q))),
time=np.ones_like(Q) * ts,
# there is a PR to fix this in event-model the problem is
# that it is that jsonschema is not identifying that numpy
# arrays are "array" from a validation point of view 🤦
validate=False,
)
data[uid]["event_pages"][desc["uid"]].append(evnt_page)
data[uid]["stop"] = run_bundle.compose_stop()
# grab a reference to the catalog (backed by msgpack files)
db = catalog['xpd_auto_202003_msgpack']
# short-cut to get the correct suitcase configured to write where
# the catalog reads from
serialize = db.v1.insert
# loop through the dictionary we built above
for v in data.values():
# insert the start
serialize('start', v['start'])
# for each stream insert the descriptors
# there can be more than one stream
for stream, descs in v['descriptors'].items():
# and each stream can have more than one descriptor
for d in descs:
serialize('descriptor', d)
# loop through and insert all of the event pages
for k, event_pages in v['event_pages'].items():
for page in event_pages:
serialize('event_page', page)
# insert the stop document
serialize('stop', v['stop'])
# # get all of the uids
# uids = list(db.search({}))
#
# # pull _all_ of the RunCatalogs
# data = [db[uid] for uid in uids]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment