The shared Globus endpoint includes one messagepack (*.msgpack
) file per
Bluesky Run. DataBroker can run on top of these filesdirectly, and that's how
we'll start. Alternatively, we can read the contents of these files into
MongoDB to gain better performance and support for more advanced search queries.
We'll address that process at the end. It may or may not be necessary depending
on your needs.
The shared endpoint also include a directory (2019
) that contains large files
generated by the detector. These files are referenced by the messagepack files
and will be loaded on demand by databroker. The internal directory structure is
important and must not be changed.
Place the messagepack files and the directory together at any location on your
system. We'll refer to it as /path/to/data_files
below.
You will need Python 3.6 or higher. I recommend creating a separate environment to work in, either with conda if you are a conda user
conda create -n local-databroker python=3.7
conda activate local-databroker
or else with venv
(which is built in to Python).
python -V # confirm you have 3.6 or higher
python -m venv local-databroker
source local-databroker/bin/activate
Now that we created and activated an environment, install the necessary Python packages.
pip install area-detector-handlers ipython
# We need a prerelease of databroker because we're going to use some features
# still in beta at the moment.
pip install --pre databroker
When databroker imports, it tries to discover datasets available on the system. There are a couple ways to configure this, but the easiest way is to create a YAML file and put it on databroker's catalog search path.
Determine your catalog serach path, which is OS and environment dependent:
python -c "import databroker; print(databroker.catalog_search_path())"
Create one of those directories if it doesn't yet exist, and then place this
file there. The file may have any name---say, test_data.yml
.
sources:
test_data:
driver: bluesky-msgpack-catalog
args:
paths:
- "/path/to/data_files/*.msgpack"
root_map:
"/tmp/export_for_mike": "/path/to/data_files"
where "/path/to/data_files"
is whatever path the copied data resides in.
(The path on the left on the last line, "/tmp/exported_for_mike"
is the path
that the files were originally exported to. That should be left as is; it is
acting as a placeholder and will be replaced with the path on the right.)
In [1]: import databroker
In [2]: catalog = databroker.catalog['test_data']
In [3]: list(catalog)
Out[3]:
['49147bac-f099-4b4d-bb3b-f77a69ad9992',
'fd4cc444-25fe-4b4d-ab2a-dcdb1174d837']
In [4]: run = catalog['49147bac'] # first few characters will do
In [5]: run
Out[5]:
Run Catalog
uid='49147bac-f099-4b4d-bb3b-f77a69ad9992'
exit_status='success'
2019-02-15 09:48:28.140 -- 2019-02-15 09:52:03.505
Streams:
* baseline
* zps_pi_r_monitor
* primary
In [6]: dataset = run.primary.to_dask()
In [7]: dataset
Out[7]:
<xarray.Dataset>
Dimensions: (dim_0: 20, dim_1: 2160, dim_2: 2560, dim_3: 19, dim_4: 2, dim_5: 35, dim_6: 2, time: 1)
Coordinates:
* time (time) float64 1.55e+09
Dimensions without coordinates: dim_0, dim_1, dim_2, dim_3, dim_4, dim_5, dim_6
Data variables:
zps_pi_r (time) float64 dask.array<chunksize=(1,), meta=np.ndarray>
zps_pi_r_user_setpoint (time) float64 dask.array<chunksize=(1,), meta=np.ndarray>
zps_pi_r_dial_readback (time) float64 dask.array<chunksize=(1,), meta=np.ndarray>
zps_pi_r_dial_counts (time) int64 dask.array<chunksize=(1,), meta=np.ndarray>
zps_pi_r_motor_res (time) float64 dask.array<chunksize=(1,), meta=np.ndarray>
<snipped for length>
In [8]: dataset['zps_pi_r'] # Access a specific column.
Out[8]:
<xarray.DataArray 'zps_pi_r' (time: 1)>
dask.array<stack, shape=(1,), dtype=float64, chunksize=(1,), chunktype=numpy.ndarray>
Coordinates:
* time (time) float64 1.55e+09
In [9]: dataset['Andor_image']
Out[9]:
<xarray.DataArray 'Andor_image' (time: 1, dim_0: 20, dim_1: 2160, dim_2: 2560)>
dask.array<stack, shape=(1, 20, 2160, 2560), dtype=uint16, chunksize=(1, 12, 2160, 2560), chunktype=numpy.ndarray>
Coordinates:
* time (time) float64 1.55e+09
Dimensions without coordinates: dim_0, dim_1, dim_2
These objects are xarray DataArrays
They wraps labels and convenience methods like .plot()
around numpy
arrays or things that act like numpy arrays. In this case, the wrapped array
is a dask array. Dask allows the data to be loaded
and processed lazily in chunks, which can be important for large image data.
To forgo all that and just load the numpy array (assuming you have enough RAM)
just call dataset['Andor_image'].load()
to pull a column up into memory or
dataset.load()
to pull up all the columns. Having done that, you can access a
plain numpy array at, for example, dataset['Andor_image'].data
.
To illustrate some of this:
In [10]: dataset['Andor_image'].data # a dask.array inside this xarray.DataArray
Out[10]: dask.array<stack, shape=(1, 20, 2160, 2560), dtype=uint16, chunksize=(1, 12, 2160, 2560), chunktype=numpy.ndarray>
In [11]: dataset['Andor_image'].sum() # can do normal numpy things memory-efficiently
Out[11]:
<xarray.DataArray 'Andor_image' ()>
dask.array<sum-aggregate, shape=(), dtype=uint64, chunksize=(), chunktype=numpy.ndarray>
# Dask can do normal numpy things memory-efficiently and parallelized!
In [12]: dataset['Andor_image'].sum().compute()
Out[12]:
<xarray.DataArray 'Andor_image' ()>
array(11534166777, dtype=uint64)
In [13]: dataset['Andor_image'].load() # or bypass dask and just load all the data
Out[13]:
<xarray.DataArray 'Andor_image' (time: 1, dim_0: 20, dim_1: 2160, dim_2: 2560)>
array([[[[101, 93, 105, ..., 96, 100, 114],
[ 91, 109, 112, ..., 96, 107, 100],
[ 92, 88, 98, ..., 97, 115, 88],
...,
[ 97, 97, 101, ..., 76, 89, 92],
[114, 91, 87, ..., 117, 101, 98],
[120, 113, 104, ..., 101, 87, 81]],
[[109, 101, 103, ..., 98, 116, 118],
[ 98, 109, 107, ..., 110, 102, 114],
[ 92, 87, 97, ..., 95, 116, 110],
...,
[ 96, 107, 102, ..., 90, 99, 104],
[129, 99, 93, ..., 114, 110, 94],
[105, 101, 95, ..., 68, 88, 92]],
[[115, 101, 101, ..., 108, 114, 105],
[ 97, 89, 71, ..., 98, 111, 97],
[105, 93, 92, ..., 107, 103, 82],
...,
[ 95, 112, 100, ..., 109, 91, 86],
[130, 117, 89, ..., 99, 97, 105],
[130, 98, 100, ..., 97, 97, 97]],
...,
[[110, 99, 105, ..., 120, 125, 112],
[101, 99, 94, ..., 100, 111, 110],
[ 90, 98, 95, ..., 103, 112, 93],
...,
[101, 97, 99, ..., 84, 84, 94],
[112, 115, 100, ..., 105, 117, 81],
[ 83, 97, 106, ..., 81, 92, 86]],
[[109, 108, 90, ..., 103, 110, 101],
[101, 119, 99, ..., 101, 107, 104],
[111, 94, 99, ..., 95, 91, 106],
...,
[105, 102, 112, ..., 90, 99, 112],
[121, 116, 84, ..., 93, 117, 87],
[104, 95, 105, ..., 95, 96, 85]],
[[114, 101, 94, ..., 88, 106, 87],
[ 91, 110, 96, ..., 100, 116, 105],
[ 99, 95, 121, ..., 103, 102, 108],
...,
[ 97, 112, 114, ..., 94, 87, 113],
[126, 113, 113, ..., 107, 98, 92],
[104, 101, 100, ..., 87, 90, 89]]]], dtype=uint16)
Coordinates:
* time (time) float64 1.55e+09
Dimensions without coordinates: dim_0, dim_1, dim_2
In [14]: dataset['Andor_image'].data # Now data is just a numpy array
Out[14]:
array([[[[101, 93, 105, ..., 96, 100, 114],
[ 91, 109, 112, ..., 96, 107, 100],
[ 92, 88, 98, ..., 97, 115, 88],
...,
[ 97, 97, 101, ..., 76, 89, 92],
[114, 91, 87, ..., 117, 101, 98],
[120, 113, 104, ..., 101, 87, 81]],
[[109, 101, 103, ..., 98, 116, 118],
[ 98, 109, 107, ..., 110, 102, 114],
[ 92, 87, 97, ..., 95, 116, 110],
...,
[ 96, 107, 102, ..., 90, 99, 104],
[129, 99, 93, ..., 114, 110, 94],
[105, 101, 95, ..., 68, 88, 92]],
[[115, 101, 101, ..., 108, 114, 105],
[ 97, 89, 71, ..., 98, 111, 97],
[105, 93, 92, ..., 107, 103, 82],
...,
[ 95, 112, 100, ..., 109, 91, 86],
[130, 117, 89, ..., 99, 97, 105],
[130, 98, 100, ..., 97, 97, 97]],
...,
[[110, 99, 105, ..., 120, 125, 112],
[101, 99, 94, ..., 100, 111, 110],
[ 90, 98, 95, ..., 103, 112, 93],
...,
[101, 97, 99, ..., 84, 84, 94],
[112, 115, 100, ..., 105, 117, 81],
[ 83, 97, 106, ..., 81, 92, 86]],
[[109, 108, 90, ..., 103, 110, 101],
[101, 119, 99, ..., 101, 107, 104],
[111, 94, 99, ..., 95, 91, 106],
...,
[105, 102, 112, ..., 90, 99, 112],
[121, 116, 84, ..., 93, 117, 87],
[104, 95, 105, ..., 95, 96, 85]],
[[114, 101, 94, ..., 88, 106, 87],
[ 91, 110, 96, ..., 100, 116, 105],
[ 99, 95, 121, ..., 103, 102, 108],
...,
[ 97, 112, 114, ..., 94, 87, 113],
[126, 113, 113, ..., 107, 98, 92],
[104, 101, 100, ..., 87, 90, 89]]]], dtype=uint16)
If you have no need for memory efficiency or parallelism, you can skip dask entirely by loading the data like
dataset = run.primary.read()
which is equivalent to
dataset = run.primary.to_dask().load()
See this section of the databroker documentation and the links above to xarray and dask for more.
The above is utilizing the databroker "v2" interface. There is also a "v1" interface for back-compatibility with previous versions of databroker. You can easily move between them and retreat to "v1" if you are inclined without reinstalling anything: see the last codeblock in this section if this is relevant to you.
Install Mongo. See these official guides for installing the free Community Edition on any OS.
Update your YAML file to add a new section like so:
sources:
test_data:
driver: bluesky-msgpack-catalog
args:
paths:
- "/path/to/data_files"
root_map:
"/tmp/export_for_mike": "/path/to/data_files/*.msgpack"
test_data_in_database:
driver: bluesky-mongo-normalized-catalog
args:
metadatastore_db: mongodb://localhost:27017/test_metadatastore
asset_registry_db: mongodb://localhost:27017/test_asset_registiry
root_map:
"/tmp/export_for_mike": "/path/to/data_files"
where again "/path/to/data_files"
is whatever path the copied data resides in.
Now migrate the contents of the *.msgpack
files into MongoDB like so.
Disclaimer: I have not tested this exact code, but it is adapted from a
verified-working example. Please speak up if you encounter problems as the error
could be mine.
In [1]: import databroker
In [2]: import suitcase.mongo_normalized
In [3]: destination_catalog = databroker.catalog['test_data_in_database']
In [4]: serializer = suitcase.mongo_normalized.Serializer(
...: destination_catalog._metadatastore_db,
...: destination_catalog._asset_registry_db)
In [8]: import intake
In [9]: BlueskyMsgpackCatalog = intake.registry['bluesky-msgpack-catalog'] # a future-proof way of getting this class
In [10]: source_catalog = BlueskyMsgpackCatalog('/path/to/data_files/*.msgpack')
In [11]: for _, run in source_catalog.items():
...: for name, doc in run.canonical(fill='no'):
...: serializer(name, doc)
...:
Then access the data as above, using
databroker.catalog['test_data_in_database']
in place of
databroker.catalog['test_data']
.