Skip to content

Instantly share code, notes, and snippets.

@danielballan
Last active March 28, 2022 15:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save danielballan/e5cf310ceca01e694aed9efbdcbe8351 to your computer and use it in GitHub Desktop.
Save danielballan/e5cf310ceca01e694aed9efbdcbe8351 to your computer and use it in GitHub Desktop.
Store and search tomographic reconstructions with Tiled
[flake8]
max-line-length = 115
[settings]
profile=black

Store and search tomographic reconstructions with Tiled

Instructions

  1. Start MongoDB somewhere.

  2. Make the directory /tmp/fxi-recon-tiled-dev or update config.yml to point at an existing, writeable directory. If MongoDB is running somewhere other than localhost:27017, update config.yml accordingly.

  3. In a clean Python environment, install the server requirements. I tested on Python 3.8. Anything 3.8 or higher should work.

    pip install -r requirements-server.txt
    
  4. Start a Tiled server using the provided configuration.

    tiled serve config config.yml
    

    The files server_ext.py and queries.py must either be present in the same diretory as config.yml, as it is in this gist repo, or else somewhere on Python's import path.

  5. In your Tomviz environment, install the client requirements.

    pip install --upgrade 'tiled[client]>=0.1.0a59'
    
  6. Use the gist repo as your working directory, or else ensure that client_ext.py and queries.py are somewhere on Python's import path.

  7. In the output from Step 4, starting the server, you'll see an API key. Set that API key as an environment variable.

    export TILED_API_KEY=...
    
  8. Run python example.py to submit a "reconstruction" with some metadata, search for it, and retrieve it.

It is left as an exercise to the reader to integrate these Python function calls, and perhaps other queries of interest, with the Tomviz UI.

import time
from dataclasses import asdict
from tiled.client.utils import handle_error
from tiled.structures.array import ArrayMacroStructure, ArrayStructure, BuiltinDtype
def submit_recon(context, array, metadata):
structure = ArrayStructure(
macro=ArrayMacroStructure(
shape=array.shape,
# just one chunk for now...
chunks=tuple((size,) for size in array.shape),
),
micro=BuiltinDtype.from_numpy_dtype(array.dtype),
)
data = {"metadata": metadata, "structure": asdict(structure)}
response = context._client.post("/recon/", json=data)
handle_error(response)
uid = response.json()["uid"]
time.sleep(0.1)
context._client.put(f"/recon/{uid}", content=array.tobytes())
authentication:
allow_anonymous_access: true
trees:
- path: /
tree: "server_ext:MongoAdapter.from_uri"
args:
uri: mongodb://localhost:27017/fxi-recon-tiled-dev
directory: /tmp/fxi-recon-tiled-dev # writeable directory
import numpy
from tiled.client import from_uri, show_logs
from client_ext import submit_recon
from queries import scan_id # local file in this gist
# show_logs() # Log HTTP traffic to stderr.
c = from_uri("http://localhost:8000/api")
# placeholders for reconstructions
# Numpy arrays of any dimensionality, shape, or dtype
# (other than Python "object" type) are accepted.
recon1 = numpy.ones((5, 5, 5))
recon2 = 2 * recon1
recon3 = 3 * recon1
# The dict of metadata here is totally freeform;
# submit any (JSON-serializable) data you want.
submit_recon(c.context, recon1, {"scan_id": 1, "method": "A"})
submit_recon(c.context, recon2, {"scan_id": 1, "method": "B"})
submit_recon(c.context, recon3, {"scan_id": 2, "method": "A"})
print("searching for reconstructions corresponding to scan_id 1...")
results = c.search(scan_id(1))
print(f"found {len(results)} results")
print("frist result:")
result = results.values_indexer[0]
print("array:", result[:]) # numpy array
print("metadata:", result.metadata) # dict of metadata
import collections.abc
from dataclasses import dataclass
import json
from tiled.queries import register
@register(name="raw_mongo")
@dataclass
class RawMongo:
"""
Run a MongoDB query against a given collection.
"""
query: str # We cannot put a dict in a URL, so this a JSON str.
def __init__(self, query):
if not isinstance(query, str):
query = json.dumps(query)
self.query = query
def scan_id(number):
return RawMongo({"metadata.scan_id": int(number)})
dask[array]
fastapi
h5py
numpy
pydantic
pymongo
tiled[server]
import collections.abc
import json
import os
import uuid
from pathlib import Path
from typing import Any, Dict, Optional
import dask.array
import h5py
import numpy
import pydantic
import pymongo
from fastapi import APIRouter, HTTPException, Request, Security
from tiled.adapters.array import ArrayAdapter
from tiled.adapters.utils import IndexersMixin, tree_repr
from tiled.server.core import json_or_msgpack
from tiled.server.dependencies import entry
from tiled.structures.array import ArrayStructure
from tiled.query_registration import QueryTranslationRegistry
# from tiled.structures.array import ArrayStructure
from tiled.utils import UNCHANGED
from queries import RawMongo # local file in this gist
class Document(pydantic.BaseModel):
active: bool
uid: str
metadata: Dict
structure: Any # TODO ArrayStructure
path: Optional[str]
class PostMetadataRequest(pydantic.BaseModel):
metadata: Dict
structure: Any # TODO ArrayStructure
class PostMetadataResponse(pydantic.BaseModel):
uid: str
router = APIRouter()
@router.post("/recon/{path:path}", response_model=PostMetadataResponse)
def post_metadata(
request: Request,
body: PostMetadataRequest,
entry=Security(entry, scopes=["write:data", "write:metadata"]),
):
if not isinstance(entry, MongoAdapter):
raise HTTPException(
status_code=404, detail="This path cannot accept reconstruction metadata."
)
uid = entry.post_metadata(metadata=body.metadata, structure=body.structure)
return json_or_msgpack(request, {"uid": uid})
@router.put("/recon/{path:path}")
async def put_data(
request: Request,
entry=Security(entry, scopes=["write:data", "write:metadata"]),
):
if not isinstance(entry, ReconAdapter):
raise HTTPException(
status_code=404, detail="This path cannot accept reconstruction data."
)
data = await request.body()
entry.put_data(data)
def raise_if_inactive(method):
def inner(self, *args, **kwargs):
if self.array_adapter is None:
raise ValueError("Not active")
else:
return method(self, *args, **kwargs)
return inner
class ReconAdapter:
structure_family = "array"
def __init__(self, collection, directory, doc):
self.collection = collection
self.directory = directory
self.doc = Document(**doc)
self.array_adapter = None
if self.doc.active:
file = h5py.File(self.doc.path)
dataset = file["data"]
self.array_adapter = ArrayAdapter(dask.array.from_array(dataset))
@property
def structure(self):
return ArrayStructure.from_json(self.doc.structure)
@property
def metadata(self):
return self.doc.metadata
@raise_if_inactive
def read(self, *args, **kwargs):
return self.array_adapter.read(*args, **kwargs)
@raise_if_inactive
def read_block(self, *args, **kwargs):
return self.array_adapter.read_block(*args, **kwargs)
def microstructure(self):
return self.array_adapter.microstructure()
def macrostructure(self):
return self.array_adapter.macrostructure()
def put_data(self, body):
# Organize files into subdirectories with the first two
# charcters of the uid to avoid one giant directory.
path = self.directory / self.doc.uid[:2] / self.doc.uid
path.parent.mkdir(parents=True, exist_ok=True)
array = numpy.frombuffer(
body, dtype=self.structure.micro.to_numpy_dtype()
).reshape(self.structure.macro.shape)
with h5py.File(path, "w") as file:
file.create_dataset("data", data=array)
self.collection.update_one(
{"uid": self.doc.uid},
{"$set": {"path": str(path), "active": True}},
)
class MongoAdapter(collections.abc.Mapping, IndexersMixin):
structure_family = "node"
include_routers = [router]
query_registry = QueryTranslationRegistry()
register_query = query_registry.register
def __init__(
self,
*,
database,
directory,
queries=None,
sorting=None,
metadata=None,
principal=None,
access_policy=None,
):
self.database = database
self.collection = database["reconstructions"]
self.directory = Path(directory).resolve()
if not self.directory.exists():
raise ValueError(f"Directory {self.directory} does not exist.")
if not self.directory.is_dir():
raise ValueError(
f"The given directory path {self.directory} is not a directory."
)
if not os.access(self.directory, os.W_OK):
raise ValueError("Directory {self.directory} is not writeable.")
self.queries = queries or []
self.sorting = sorting or [("metadata.scan_id", 1)]
self.metadata = metadata or {}
self.principal = principal
self.access_policy = access_policy
super().__init__()
@classmethod
def from_uri(cls, uri, directory, *, metadata=None):
if not pymongo.uri_parser.parse_uri(uri)["database"]:
raise ValueError(
f"Invalid URI: {uri!r} " f"Did you forget to include a database?"
)
client = pymongo.MongoClient(uri)
database = client.get_database()
return cls(database=database, directory=directory, metadata=metadata)
def new_variation(
self,
metadata=UNCHANGED,
queries=UNCHANGED,
sorting=UNCHANGED,
principal=UNCHANGED,
**kwargs,
):
if metadata is UNCHANGED:
metadata = self.metadata
if queries is UNCHANGED:
queries = self.queries
if sorting is UNCHANGED:
sorting = self.sorting
if principal is UNCHANGED:
principal = self.principal
return type(self)(
database=self.database,
directory=self.directory,
metadata=metadata,
queries=queries,
sorting=sorting,
access_policy=self.access_policy,
principal=principal,
**kwargs,
)
def post_metadata(self, metadata, structure):
uid = str(uuid.uuid4())
self.collection.insert_one(
{
"uid": uid,
"metadata": metadata,
"structure": structure,
"path": None,
"active": False,
}
)
return uid
def authenticated_as(self, identity):
if self.principal is not None:
raise RuntimeError(f"Already authenticated as {self.principal}")
if self.access_policy is not None:
raise NotImplementedError("No support for Access Policy")
return self
def _build_mongo_query(self, *queries):
combined = self.queries + list(queries)
if combined:
return {"$and": combined}
else:
return {}
def __getitem__(self, key):
query = {"uid": key}
doc = self.collection.find_one(self._build_mongo_query(query), {"_id": False})
if doc is None:
raise KeyError(key)
return ReconAdapter(self.collection, self.directory, doc)
def __iter__(self):
# TODO Apply pagination, as we do in Databroker.
for doc in list(
self.collection.find(
self._build_mongo_query({"active": True}), {"uid": True}
)
):
yield doc["uid"]
def __len__(self):
return self.collection.count_documents(
self._build_mongo_query({"active": True})
)
def __length_hint__(self):
# https://www.python.org/dev/peps/pep-0424/
return self.collection.estimated_document_count(
self._build_mongo_query({"active": True}),
)
def __repr__(self):
# Display up to the first N keys to avoid making a giant service
# request. Use _keys_slicer because it is unauthenticated.
N = 10
return tree_repr(self, self._keys_slice(0, N, direction=1))
def search(self, query):
"""
Return a MongoAdapter with a subset of the mapping.
"""
return self.query_registry(query, self)
def sort(self, sorting):
return self.new_variation(sorting=sorting)
def _keys_slice(self, start, stop, direction):
assert direction == 1, "direction=-1 should be handled by the client"
skip = start or 0
if stop is not None:
limit = stop - skip
else:
limit = None
for doc in self.collection.find(
self._build_mongo_query({"active": True}),
skip=skip,
limit=limit,
):
yield doc["uid"]
def _items_slice(self, start, stop, direction):
assert direction == 1, "direction=-1 should be handled by the client"
skip = start or 0
if stop is not None:
limit = stop - skip
else:
limit = None
for doc in self.collection.find(
self._build_mongo_query({"active": True}),
skip=skip,
limit=limit,
):
yield (doc["uid"], ReconAdapter(self.database, self.directory, doc))
def _item_by_index(self, index, direction):
assert direction == 1, "direction=-1 should be handled by the client"
doc = next(
self.collection.find(
self._build_mongo_query({"active": True}),
skip=index,
limit=1,
)
)
return (doc["uid"], ReconAdapter(self.database, self.directory, doc))
def raw_mongo(query, catalog):
# For now, only handle search on the 'run_start' collection.
return catalog.new_variation(
queries=catalog.queries + [json.loads(query.query)],
)
MongoAdapter.register_query(RawMongo, raw_mongo)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment