Skip to content

Instantly share code, notes, and snippets.

@stuartcampbell
Forked from danielballan/README.md
Created December 16, 2019 21:59
Show Gist options
  • Save stuartcampbell/fc333268cd634ebc473b6f4db441c9ea to your computer and use it in GitHub Desktop.
Save stuartcampbell/fc333268cd634ebc473b6f4db441c9ea to your computer and use it in GitHub Desktop.
data-transfer

Running a Local DataBroker

Place downloaded files

The shared Globus endpoint includes one messagepack (*.msgpack) file per Bluesky Run. DataBroker can run on top of these filesdirectly, and that's how we'll start. Alternatively, we can read the contents of these files into MongoDB to gain better performance and support for more advanced search queries. We'll address that process at the end. It may or may not be necessary depending on your needs.

The shared endpoint also include a directory (2019) that contains large files generated by the detector. These files are referenced by the messagepack files and will be loaded on demand by databroker. The internal directory structure is important and must not be changed.

Place the messagepack files and the directory together at any location on your system. We'll refer to it as /path/to/data_files below.

Create software environment

You will need Python 3.6 or higher. I recommend creating a separate environment to work in, either with conda if you are a conda user

conda create -n local-databroker python=3.7
conda activate local-databroker

or else with venv (which is built in to Python).

python -V  # confirm you have 3.6 or higher
python -m venv local-databroker
source local-databroker/bin/activate

Now that we created and activated an environment, install the necessary Python packages.

pip install area-detector-handlers ipython
# We need a prerelease of databroker because we're going to use some features
# still in beta at the moment.
pip install --pre databroker

Configure a "catalog file"

When databroker imports, it tries to discover datasets available on the system. There are a couple ways to configure this, but the easiest way is to create a YAML file and put it on databroker's catalog search path.

Determine your catalog serach path, which is OS and environment dependent:

python -c "import databroker; print(databroker.catalog_search_path())"

Create one of those directories if it doesn't yet exist, and then place this file there. The file may have any name---say, test_data.yml.

sources:
  test_data:
      driver: bluesky-msgpack-catalog
      args:
        paths:
          - "/path/to/data_files/*.msgpack"
        root_map:
          "/tmp/export_for_mike": "/path/to/data_files"

where "/path/to/data_files" is whatever path the copied data resides in. (The path on the left on the last line, "/tmp/exported_for_mike" is the path that the files were originally exported to. That should be left as is; it is acting as a placeholder and will be replaced with the path on the right.)

Access the data

In [1]: import databroker

In [2]: catalog = databroker.catalog['test_data']

In [3]: list(catalog)
Out[3]:
['49147bac-f099-4b4d-bb3b-f77a69ad9992',
 'fd4cc444-25fe-4b4d-ab2a-dcdb1174d837']

In [4]: run = catalog['49147bac']  # first few characters will do

In [5]: run
Out[5]:
Run Catalog
  uid='49147bac-f099-4b4d-bb3b-f77a69ad9992'
  exit_status='success'
  2019-02-15 09:48:28.140 -- 2019-02-15 09:52:03.505
  Streams:
    * baseline
    * zps_pi_r_monitor
    * primary

In [6]: dataset = run.primary.to_dask()

In [7]: dataset
Out[7]:
<xarray.Dataset>
Dimensions:                                  (dim_0: 20, dim_1: 2160, dim_2: 2560, dim_3: 19, dim_4: 2, dim_5: 35, dim_6: 2, time: 1)
Coordinates:
  * time                                     (time) float64 1.55e+09
Dimensions without coordinates: dim_0, dim_1, dim_2, dim_3, dim_4, dim_5, dim_6
Data variables:
    zps_pi_r                                 (time) float64 dask.array<chunksize=(1,), meta=np.ndarray>
    zps_pi_r_user_setpoint                   (time) float64 dask.array<chunksize=(1,), meta=np.ndarray>
    zps_pi_r_dial_readback                   (time) float64 dask.array<chunksize=(1,), meta=np.ndarray>
    zps_pi_r_dial_counts                     (time) int64 dask.array<chunksize=(1,), meta=np.ndarray>
    zps_pi_r_motor_res                       (time) float64 dask.array<chunksize=(1,), meta=np.ndarray>
<snipped for length>

In [8]: dataset['zps_pi_r']  # Access a specific column.
Out[8]:
<xarray.DataArray 'zps_pi_r' (time: 1)>
dask.array<stack, shape=(1,), dtype=float64, chunksize=(1,), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) float64 1.55e+09

In [9]: dataset['Andor_image']
Out[9]:
<xarray.DataArray 'Andor_image' (time: 1, dim_0: 20, dim_1: 2160, dim_2: 2560)>
dask.array<stack, shape=(1, 20, 2160, 2560), dtype=uint16, chunksize=(1, 12, 2160, 2560), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) float64 1.55e+09
Dimensions without coordinates: dim_0, dim_1, dim_2

These objects are xarray DataArrays They wraps labels and convenience methods like .plot() around numpy arrays or things that act like numpy arrays. In this case, the wrapped array is a dask array. Dask allows the data to be loaded and processed lazily in chunks, which can be important for large image data. To forgo all that and just load the numpy array (assuming you have enough RAM) just call dataset['Andor_image'].load() to pull a column up into memory or dataset.load() to pull up all the columns. Having done that, you can access a plain numpy array at, for example, dataset['Andor_image'].data.

To illustrate some of this:

In [10]: dataset['Andor_image'].data  # a dask.array inside this xarray.DataArray
Out[10]: dask.array<stack, shape=(1, 20, 2160, 2560), dtype=uint16, chunksize=(1, 12, 2160, 2560), chunktype=numpy.ndarray>

In [11]: dataset['Andor_image'].sum()  # can do normal numpy things memory-efficiently
Out[11]:
<xarray.DataArray 'Andor_image' ()>
dask.array<sum-aggregate, shape=(), dtype=uint64, chunksize=(), chunktype=numpy.ndarray>

# Dask can do normal numpy things memory-efficiently and parallelized!
In [12]: dataset['Andor_image'].sum().compute()
Out[12]:
<xarray.DataArray 'Andor_image' ()>
array(11534166777, dtype=uint64)

In [13]: dataset['Andor_image'].load()  # or bypass dask and just load all the data
Out[13]:
<xarray.DataArray 'Andor_image' (time: 1, dim_0: 20, dim_1: 2160, dim_2: 2560)>
array([[[[101,  93, 105, ...,  96, 100, 114],
         [ 91, 109, 112, ...,  96, 107, 100],
         [ 92,  88,  98, ...,  97, 115,  88],
         ...,
         [ 97,  97, 101, ...,  76,  89,  92],
         [114,  91,  87, ..., 117, 101,  98],
         [120, 113, 104, ..., 101,  87,  81]],

        [[109, 101, 103, ...,  98, 116, 118],
         [ 98, 109, 107, ..., 110, 102, 114],
         [ 92,  87,  97, ...,  95, 116, 110],
         ...,
         [ 96, 107, 102, ...,  90,  99, 104],
         [129,  99,  93, ..., 114, 110,  94],
         [105, 101,  95, ...,  68,  88,  92]],

        [[115, 101, 101, ..., 108, 114, 105],
         [ 97,  89,  71, ...,  98, 111,  97],
         [105,  93,  92, ..., 107, 103,  82],
         ...,
         [ 95, 112, 100, ..., 109,  91,  86],
         [130, 117,  89, ...,  99,  97, 105],
         [130,  98, 100, ...,  97,  97,  97]],

        ...,

        [[110,  99, 105, ..., 120, 125, 112],
         [101,  99,  94, ..., 100, 111, 110],
         [ 90,  98,  95, ..., 103, 112,  93],
         ...,
         [101,  97,  99, ...,  84,  84,  94],
         [112, 115, 100, ..., 105, 117,  81],
         [ 83,  97, 106, ...,  81,  92,  86]],

        [[109, 108,  90, ..., 103, 110, 101],
         [101, 119,  99, ..., 101, 107, 104],
         [111,  94,  99, ...,  95,  91, 106],
         ...,
         [105, 102, 112, ...,  90,  99, 112],
         [121, 116,  84, ...,  93, 117,  87],
         [104,  95, 105, ...,  95,  96,  85]],

        [[114, 101,  94, ...,  88, 106,  87],
         [ 91, 110,  96, ..., 100, 116, 105],
         [ 99,  95, 121, ..., 103, 102, 108],
         ...,
         [ 97, 112, 114, ...,  94,  87, 113],
         [126, 113, 113, ..., 107,  98,  92],
         [104, 101, 100, ...,  87,  90,  89]]]], dtype=uint16)
Coordinates:
  * time     (time) float64 1.55e+09
Dimensions without coordinates: dim_0, dim_1, dim_2

In [14]: dataset['Andor_image'].data  # Now data is just a numpy array
Out[14]:
array([[[[101,  93, 105, ...,  96, 100, 114],
         [ 91, 109, 112, ...,  96, 107, 100],
         [ 92,  88,  98, ...,  97, 115,  88],
         ...,
         [ 97,  97, 101, ...,  76,  89,  92],
         [114,  91,  87, ..., 117, 101,  98],
         [120, 113, 104, ..., 101,  87,  81]],

        [[109, 101, 103, ...,  98, 116, 118],
         [ 98, 109, 107, ..., 110, 102, 114],
         [ 92,  87,  97, ...,  95, 116, 110],
         ...,
         [ 96, 107, 102, ...,  90,  99, 104],
         [129,  99,  93, ..., 114, 110,  94],
         [105, 101,  95, ...,  68,  88,  92]],

        [[115, 101, 101, ..., 108, 114, 105],
         [ 97,  89,  71, ...,  98, 111,  97],
         [105,  93,  92, ..., 107, 103,  82],
         ...,
         [ 95, 112, 100, ..., 109,  91,  86],
         [130, 117,  89, ...,  99,  97, 105],
         [130,  98, 100, ...,  97,  97,  97]],

        ...,

        [[110,  99, 105, ..., 120, 125, 112],
         [101,  99,  94, ..., 100, 111, 110],
         [ 90,  98,  95, ..., 103, 112,  93],
         ...,
         [101,  97,  99, ...,  84,  84,  94],
         [112, 115, 100, ..., 105, 117,  81],
         [ 83,  97, 106, ...,  81,  92,  86]],

        [[109, 108,  90, ..., 103, 110, 101],
         [101, 119,  99, ..., 101, 107, 104],
         [111,  94,  99, ...,  95,  91, 106],
         ...,
         [105, 102, 112, ...,  90,  99, 112],
         [121, 116,  84, ...,  93, 117,  87],
         [104,  95, 105, ...,  95,  96,  85]],

        [[114, 101,  94, ...,  88, 106,  87],
         [ 91, 110,  96, ..., 100, 116, 105],
         [ 99,  95, 121, ..., 103, 102, 108],
         ...,
         [ 97, 112, 114, ...,  94,  87, 113],
         [126, 113, 113, ..., 107,  98,  92],
         [104, 101, 100, ...,  87,  90,  89]]]], dtype=uint16)

If you have no need for memory efficiency or parallelism, you can skip dask entirely by loading the data like

dataset = run.primary.read()

which is equivalent to

dataset = run.primary.to_dask().load()

See this section of the databroker documentation and the links above to xarray and dask for more.

The above is utilizing the databroker "v2" interface. There is also a "v1" interface for back-compatibility with previous versions of databroker. You can easily move between them and retreat to "v1" if you are inclined without reinstalling anything: see the last codeblock in this section if this is relevant to you.

Optional: Ingest data into MongoDB.

Install Mongo. See these official guides for installing the free Community Edition on any OS.

Update your YAML file to add a new section like so:

sources:
  test_data:
      driver: bluesky-msgpack-catalog
      args:
        paths:
          - "/path/to/data_files"
        root_map:
          "/tmp/export_for_mike": "/path/to/data_files/*.msgpack"
  test_data_in_database:
      driver: bluesky-mongo-normalized-catalog
      args:
        metadatastore_db: mongodb://localhost:27017/test_metadatastore
        asset_registry_db: mongodb://localhost:27017/test_asset_registiry
        root_map:
          "/tmp/export_for_mike": "/path/to/data_files"

where again "/path/to/data_files" is whatever path the copied data resides in.

Now migrate the contents of the *.msgpack files into MongoDB like so. Disclaimer: I have not tested this exact code, but it is adapted from a verified-working example. Please speak up if you encounter problems as the error could be mine.

In [1]: import databroker

In [2]: import suitcase.mongo_normalized

In [3]: destination_catalog = databroker.catalog['test_data_in_database']

In [4]: serializer = suitcase.mongo_normalized.Serializer(
   ...:     destination_catalog._metadatastore_db,
   ...:     destination_catalog._asset_registry_db)

In [8]: import intake

In [9]: BlueskyMsgpackCatalog = intake.registry['bluesky-msgpack-catalog']  # a future-proof way of getting this class

In [10]: source_catalog = BlueskyMsgpackCatalog('/path/to/data_files/*.msgpack')

In [11]: for _, run in source_catalog.items():
    ...:     for name, doc in run.canonical(fill='no'):
    ...:         serializer(name, doc)
    ...:

Then access the data as above, using databroker.catalog['test_data_in_database'] in place of databroker.catalog['test_data'].

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment