Skip to content

Instantly share code, notes, and snippets.

Created March 1, 2023 20:28
Show Gist options
  • Save kwilcox/c044123e7658e4f96097d65a865b3287 to your computer and use it in GitHub Desktop.
Save kwilcox/c044123e7658e4f96097d65a865b3287 to your computer and use it in GitHub Desktop.
import yaml
import logging
from typing import Sequence
from starlette.routing import NoMatchFound
from fastapi import APIRouter, Depends, Response, Request
from xpublish.utils.api import DATASET_ID_ATTR_KEY
from xpublish.plugins import Dependencies, Plugin, hookimpl
logger = logging.getLogger('intake_catalog')
def get_dataset_id(ds):
xpublish_id = ds.attrs.get(DATASET_ID_ATTR_KEY)
cf_dataset_id = ".".join(
x for x in [
] if x
dataset_id_options = [
return next(x for x in dataset_id_options if x)
def get_zarr_source(xpublish_id, dataset, request):
url = ''
from xpublish.plugins.included.zarr import ZarrPlugin
url = request.url_for("get_zarr_metadata")
except NoMatchFound:
# On multi-dataset servers add the dataset_id to the route
url = request.url_for("get_zarr_metadata", dataset_id=xpublish_id)
# Remove .zmetadata from the URL to get the root zarr URL
url = url.replace("/.zmetadata", "")
if not url:
return {}
return {
'driver': 'zarr',
'description': dataset.attrs.get('summary', ''),
'args': {
'consolidated': True,
'urlpath': url
class IntakePlugin(Plugin):
"""Adds an Intake catalog endpoint"""
name = 'intake_catalog'
dataset_metadata = dict()
app_router_prefix: str = '/intake'
app_router_tags: Sequence[str] = ['intake']
dataset_router_prefix: str = ''
dataset_router_tags: Sequence[str] = ['intake']
def app_router(self, deps: Dependencies):
"""Register an application level router for app level intake catalog"""
router = APIRouter(prefix=self.app_router_prefix, tags=self.app_router_tags)
def get_request(request: Request) -> str:
return request
@router.get(".yaml", summary="Root intake catalog")
def get_root_catalog(
dataset_ids = Depends(deps.dataset_ids)
data = {
'metadata': {
'source': 'Served via `xpublish-intake`',
'access_url': str(request.url),
if dataset_ids:
data['sources'] = {
d: {
'description': self.dataset_metadata.get(d, {}).get('description', ''),
'driver': 'intake.catalog.local.YAMLFileCatalog',
'metadata': self.dataset_metadata.get(d, {}),
'args': {
'path': request.url_for('get_dataset_catalog', dataset_id=d)
for d in dataset_ids
data['sources'] = {
'dataset': {
'description': self.dataset_metadata.get('default', {}).get('description', ''),
'driver': 'intake.catalog.local.YAMLFileCatalog',
'metadata': self.dataset_metadata.get('default', {}),
'args': {
'path': request.url_for('get_dataset_catalog')
return Response(yaml.dump(data), media_type="text/yaml")
return router
def dataset_router(self, deps: Dependencies):
router = APIRouter(prefix=self.dataset_router_prefix, tags=list(self.dataset_router_tags))
def get_request(request: Request) -> str:
return request
@router.get('/intake.yaml', summary="Dataset intake catalog")
def get_dataset_catalog(
xpublish_id = get_dataset_id(dataset)
sources = {
'zarr': get_zarr_source(xpublish_id, dataset, request)
data = {
'name': xpublish_id,
'metadata': {
'source': 'Served via `xpublish-intake`',
'access_url': str(request.url),
'sources': {
f'{xpublish_id}-{k}': v for k, v in sources.items() if v
return Response(yaml.dump(data), media_type="text/yaml")
return router
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment