Skip to content

Instantly share code, notes, and snippets.

@miohtama
Created September 28, 2022 16:29
Show Gist options
  • Save miohtama/1139f3e1a316831e1b439ba3636f7f5b to your computer and use it in GitHub Desktop.
Save miohtama/1139f3e1a316831e1b439ba3636f7f5b to your computer and use it in GitHub Desktop.
Reflected view cache for SQLAlchemy - for views that you are not building in Python by hand
"""Handle caching of continous aggregate views as SQLAlchemy Table objects.
Because reflecting on-spot is too slow: https://github.com/tradingstrategy-ai/backend/pull/63
"""
import logging
from typing import List, Optional, Dict
from sqlalchemy import inspect, MetaData, inspection
from sqlalchemy.engine import Engine, Inspector
from sqlalchemy.orm import Session
from sqlalchemy.schema import Table
from dex_ohlcv.models import Base
logger = logging.getLogger(__name__)
def get_cache_container(metadata: MetaData) -> Dict[str, Table]:
"""Get the internal dict where we store cached views.
Store them on the metadata instance, because reflection
is bound to the lifetime of the metadata.
"""
container = getattr(metadata, "view_cache", None)
if container is not None:
return container
metadata.view_cache = {}
return metadata.view_cache
def reflect_views_only(bind: Engine, metadata: MetaData) -> List[Table]:
"""Dynamically pull SQLAlchemy models for all views out from PostgreSQL database.
Based on MetaData.reflect.
"""
views = []
with inspection.inspect(bind)._inspection_context() as insp:
reflect_opts = {
"autoload_with": insp,
}
available = insp.get_view_names()
for name in available:
view = Table(name, metadata, **reflect_opts)
views.append(view)
return views
# We should use a cache instead of invoking the reflection every time, as the latter
# is a *very* expensive operation.
# https://github.com/tradingstrategy-ai/backend/issues/57
def build_db_view_cache(engine: Engine, metadata: MetaData, force_reflect: bool = False) -> Dict[str, Table]:
"""Create a cache of DB view objects for much faster lookups.
Invoking the reflection on every lookup is *very* expensive, thus we want to use a
cache, especially because the database schema not change while the application is
running.
See: https://github.com/tradingstrategy-ai/backend/issues/57
:param dbsession:
The current active database session.
:param force_reflect:
If ``True``, the DB reflection will be invoked regardless if the view cache has
already been created. It effectively refreshes the cache. ``False`` by default.
"""
views = reflect_views_only(engine, metadata)
return {v.name: v for v in views}
def get_views_names_for_session(dbsession: Session) -> List[str]:
"""List views in the database of a current session object.
This lists all VIEWs created in the database currently.
"""
engine: Engine = dbsession.bind
insp: Inspector = inspect(engine)
return insp.get_view_names()
def get_view_by_name(dbsession: Session, name: str, cache=True) -> Optional["Table"]:
"""Get a reflected view by its name.
Should be multithread safe.
Views appear as `sqlalchemy.Table` objects.
:param dbsession:
Current connection and its engine
:param name:
view name
:param cache:
Use fast view cache.
Set false to regenerate cache.
See refresh_last_trade_materialized_view() hack.
"""
container = get_cache_container(Base.metadata)
if len(container) == 0 or not cache:
cached = build_db_view_cache(dbsession.bind, Base.metadata)
# This code should be multithread/multiprocess
# safe in CPython
# as we are only overriding existing values
# or adding new values
# https://stackoverflow.com/a/6955678/315168
for k, v in cached.items():
container[k] = v
return container.get(name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment