Skip to content

Instantly share code, notes, and snippets.

@MihanixA
Created March 23, 2021 15:33
Show Gist options
  • Save MihanixA/d31f6790f4f0692e49968c98870cbe78 to your computer and use it in GitHub Desktop.
Save MihanixA/d31f6790f4f0692e49968c98870cbe78 to your computer and use it in GitHub Desktop.
import time
import uuid
from typing import Iterator
from collections import deque
from contextlib import contextmanager
import threading
import traceback
import sqlalchemy
import sqlalchemy.event
import sqlalchemy.orm
from sqlalchemy.orm import Session
# importing logger class
from common.logger import Logger
__all__ = (
'DatabaseIsolationLevel',
'DatabaseMixin',
'uuid_simple'
)
class DatabaseIsolationLevel:
read_committed = 'READ_COMMITTED'
repeatable_read = 'REPEATABLE_READ'
serializable = 'SERIALIZABLE'
default = read_committed
def uuid_simple():
return uuid.uuid4().hex[:8]
class DatabaseMixin:
__session_class__ = Session
__db_thread_local = threading.local()
__db_logger = Logger('db')
def __init_subclass__(cls, db_url=None, db_base=None, **kwargs):
super().__init_subclass__(**kwargs)
cls._db_url = db_url
cls._db_base = db_base
def engine(self, isolation_level=None) -> sqlalchemy.engine.Engine:
isolation_level = isolation_level or DatabaseIsolationLevel.default
key = (self._db_url, isolation_level)
if not hasattr(self.__db_thread_local, 'engines'):
self.__db_thread_local.engines = {}
engine = self.__db_thread_local.engines.get(key, None)
if engine is None:
engine = sqlalchemy.create_engine(self._db_url, isolation_level=isolation_level,
pool_recycle=500, pool_size=30, pool_pre_ping=True)
self._db_base.metadata.create_all(engine)
def cut_long(statement, length: int = 3000):
statement = str(statement).replace('\n', ' ')
while ' ' in statement:
statement = statement.replace(' ', ' ')
return statement[:length]
@sqlalchemy.event.listens_for(engine, 'before_cursor_execute')
def before_cursor_execute(conn, _, statement, params, ____, _____):
query_id = uuid_simple()
conn.info.setdefault('query_start_time', deque()).appendleft(time.time())
conn.info.setdefault('query_id', deque()).appendleft(query_id)
self.__db_logger.debug(f'Query started', query_id=query_id, statement=cut_long(statement),
parameters=cut_long(params))
@sqlalchemy.event.listens_for(engine, 'after_cursor_execute')
def after_cursor_execute(conn, _, __, ___, ____, _____):
query_id = conn.info['query_id'].pop()
elapsed = time.time() - conn.info['query_start_time'].pop()
elapsed_ms = int(elapsed * 1000)
self.__db_logger.debug(f'Query completed', query_id=query_id, elapsed_ms=elapsed_ms)
self.__db_logger.info(f'Connected to database', db_url=repr(engine), isolation_level=isolation_level)
self.__db_thread_local.engines[key] = engine
return engine
def sessionmaker(self, isolation_level=None) -> sqlalchemy.orm.sessionmaker:
isolation_level = isolation_level or DatabaseIsolationLevel.default
key = (self._db_url, isolation_level)
if not hasattr(self.__db_thread_local, 'sessionmakers'):
self.__db_thread_local.sessionmakers = {}
sessionmaker = self.__db_thread_local.sessionmakers.get(key, None)
if sessionmaker is None:
sessionmaker = sqlalchemy.orm.sessionmaker(bind=self.engine(isolation_level=isolation_level),
class_=self.__session_class__)
self.__db_thread_local.sessionmakers[key] = sessionmaker
return sessionmaker
@contextmanager
def connect(self, autoflush=False, autocommit=False, expire_on_commit=False,
isolation_level=None) -> Iterator[Session]:
"""
Example usage:
db = DatabaseETL()
with db.connect() as session:
do_something(session)
:param autoflush: When ``True``, all query operations will issue a
:meth:`~.Session.flush` call to this ``Session`` before proceeding.
This is a convenience feature so that :meth:`~.Session.flush` need
not be called repeatedly in order for database queries to retrieve
results. It's typical that ``autoflush`` is used in conjunction
with ``autocommit=False``. In this scenario, explicit calls to
:meth:`~.Session.flush` are rarely needed; you usually only need to
call :meth:`~.Session.commit` (which flushes) to finalize changes.
:param autocommit:
.. warning::
The autocommit flag is **not for general use**, and if it is
used, queries should only be invoked within the span of a
:meth:`.Session.begin` / :meth:`.Session.commit` pair. Executing
queries outside of a demarcated transaction is a legacy mode
of usage, and can in some cases lead to concurrent connection
checkouts.
Defaults to ``False``. When ``True``, the
:class:`.Session` does not keep a persistent transaction running,
and will acquire connections from the engine on an as-needed basis,
returning them immediately after their use. Flushes will begin and
commit (or possibly rollback) their own transaction if no
transaction is present. When using this mode, the
:meth:`.Session.begin` method is used to explicitly start
transactions.
:param expire_on_commit: Defaults to ``False``. When ``True``, all
instances will be fully expired after each :meth:`~.commit`,
so that all attribute/object access subsequent to a completed
transaction will load from the most recent database state.
:param isolation_level: this string parameter is interpreted by various
dialects in order to affect the transaction isolation level of the
database connection. The parameter essentially accepts some subset of
these string arguments: ``"SERIALIZABLE"``, ``"REPEATABLE_READ"``,
``"READ_COMMITTED"``, ``"READ_UNCOMMITTED"`` and ``"AUTOCOMMIT"``.
Behavior here varies per backend, and
individual dialects should be consulted directly.
"""
transaction_id = uuid_simple()
session: Session = self.sessionmaker(isolation_level)(autoflush=autoflush, autocommit=autocommit,
expire_on_commit=expire_on_commit)
self.__db_logger.debug('Transaction started', transaction_id=transaction_id)
try:
yield session
session.commit()
except BaseException as e:
self.__db_logger.warning(f'Rolling back db transaction {transaction_id}', exception=e,
traceback=traceback.format_exc())
session.rollback()
raise
finally:
session.close()
self.__db_logger.debug('Transaction finished', transaction_id=transaction_id)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment