-
-
Save jklynch/a4366b8900ec0c03883403455ae711b2 to your computer and use it in GitHub Desktop.
PersistentDict snippet for beamline startup scripts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pathlib import Path | |
import appdirs | |
try: | |
from bluesky.utils import PersistentDict | |
except ImportError: | |
import msgpack | |
import msgpack_numpy | |
import zict | |
class PersistentDict(zict.Func): | |
""" | |
A MutableMapping which syncs it contents to disk. | |
The contents are stored as msgpack-serialized files, with one file per item | |
in the mapping. | |
Note that when an item is *mutated* it is not immediately synced: | |
>>> d['sample'] = {"color": "red"} # immediately synced | |
>>> d['sample']['shape'] = 'bar' # not immediately synced | |
but that the full contents are synced to disk when the PersistentDict | |
instance is garbage collected. | |
""" | |
def __init__(self, directory): | |
self._directory = directory | |
self._file = zict.File(directory) | |
self._cache = {} | |
super().__init__(self._dump, self._load, self._file) | |
self.reload() | |
# Similar to flush() or _do_update(), but without reference to self | |
# to avoid circular reference preventing collection. | |
# NOTE: This still doesn't guarantee call on delete or gc.collect()! | |
# Explicitly call flush() if immediate write to disk required. | |
def finalize(zfile, cache, dump): | |
zfile.update((k, dump(v)) for k, v in cache.items()) | |
import weakref | |
self._finalizer = weakref.finalize( | |
self, finalize, self._file, self._cache, PersistentDict._dump) | |
@property | |
def directory(self): | |
return self._directory | |
def __setitem__(self, key, value): | |
self._cache[key] = value | |
super().__setitem__(key, value) | |
def __getitem__(self, key): | |
return self._cache[key] | |
def __delitem__(self, key): | |
del self._cache[key] | |
super().__delitem__(key) | |
def __repr__(self): | |
return f"<{self.__class__.__name__} {dict(self)!r}>" | |
@staticmethod | |
def _dump(obj): | |
"Encode as msgpack using numpy-aware encoder." | |
# See https://github.com/msgpack/msgpack-python#string-and-binary-type | |
# for more on use_bin_type. | |
return msgpack.packb( | |
obj, | |
default=msgpack_numpy.encode, | |
use_bin_type=True) | |
@staticmethod | |
def _load(file): | |
return msgpack.unpackb( | |
file, | |
object_hook=msgpack_numpy.decode, | |
raw=False) | |
def flush(self): | |
"""Force a write of the current state to disk""" | |
for k, v in self.items(): | |
super().__setitem__(k, v) | |
def reload(self): | |
"""Force a reload from disk, overwriting current cache""" | |
self._cache = dict(super().items()) | |
runengine_metadata_dir = appdirs.user_data_dir(appname="bluesky") / Path("runengine-metadata") | |
# PersistentDict will create the directory if it does not exist | |
RE.md = PersistentDict(runengine_metadata_dir) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment