Skip to content

Instantly share code, notes, and snippets.

@pkpinto
Last active July 28, 2021 11:13
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pkpinto/08cfdb13eb0a0240e14e1eb6e7a7fa85 to your computer and use it in GitHub Desktop.
Save pkpinto/08cfdb13eb0a0240e14e1eb6e7a7fa85 to your computer and use it in GitHub Desktop.
# coding: utf-8
from contextlib import contextmanager
import logging
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
class SQLSource:
"""
sql_source = SQLSource()
with sql_source() as session:
with SQLSource.transaction_scope(session) as session:
session.query(...)
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.connection_string = ...
self.db_name = self.connection_string.split('@')[1].split(':')[0]
self.engine = create_engine(self.connection_string)
self.Session = scoped_session(sessionmaker(bind=self.engine))
@contextmanager
def __call__(self):
try:
self.logger.debug('Scoped session to database ' + self.db_name)
yield self.Session()
finally:
self.Session.remove()
return
@staticmethod
@contextmanager
def transaction_scope(session):
try:
yield session
lself.logger.debug('Session commit')
session.commit()
except:
self.logger.debug('Session rollback')
session.rollback()
raise
finally:
self.logger.debug('Session close')
session.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment