Skip to content

Instantly share code, notes, and snippets.

@javiergarciad
Last active May 28, 2023 20:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save javiergarciad/52c3270922d1009f5d28a1ad750c6e18 to your computer and use it in GitHub Desktop.
Save javiergarciad/52c3270922d1009f5d28a1ad750c6e18 to your computer and use it in GitHub Desktop.
Sqlachemy Database Manager Class
import os
from pathlib import Path
from datetime import datetime
from typing import List
from contextlib import contextmanager
from sqlalchemy import create_engine, select
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, DateTime
# logging setup
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
Base = declarative_base()
class Version(Base):
__tablename__ = 'version'
id = Column(Integer, primary_key=True)
created = Column(DateTime)
updated = Column(DateTime)
class DB:
"""
Manage SQLite Database.
:param name: name of database or env DATABASE_NAME (i.e. data.db).
:param dirpath: directory path of database or env DATABASE_DIRPATH.
:param echo: echo SQLAlchemy logging.
"""
def __init__(self, name=None, dirpath=None, echo=False, ) -> None:
if name is None:
self.name = os.environ.get("DATABASE_NAME")
else:
self.name = name
if dirpath is None:
self._dirpath = os.environ.get("DATABASE_DIRPATH")
else:
self._dirpath = dirpath
self.echo = echo
self.path = f"{self._dirpath}/{self.name}"
self.uri = f"sqlite:///{self.path}"
def engine(self):
"""
Return the database engine.
:return: SQLAlchemy engine instance.
"""
try:
return create_engine(self.uri, echo=self.echo)
except SQLAlchemyError as e:
logger.critical(e)
raise SystemExit
@contextmanager
def session(self):
"""
Context manager for providing a session.
:return: SQLAlchemy session instance.
"""
try:
Session = sessionmaker(bind=self.engine())
session = Session()
yield session
session.commit()
except SQLAlchemyError as e:
logger.critical(e)
session.rollback()
raise SystemExit
finally:
session.close()
def get_status(self):
"""
Check database status and schema
:return:
"""
# check if the database file exist
if not Path(self.path).is_file():
return {"db": None, "status": None, "version": None}
# if there is a file check the shema
try:
inspector = sqlalchemy.inspect(self.engine())
# Compare the schema with the expected schema
expected_tables = set(Base.metadata.tables.keys())
actual_tables = set(inspector.get_table_names())
if expected_tables == actual_tables:
with self.session() as s:
version = s.query(Version).first()
return version.to_dict()
else:
logger.error('schema_mismatch')
return False
except SQLAlchemyError() as e:
logger.critical(e)
raise SystemExit
def get_tables_info(self):
"""
Get the number of rows in each table of the database.
:return: Dictionary with table names as keys and row counts as values.
"""
try:
inspector = sqlalchemy.inspect(self.engine())
table_row_counts = {}
# Iterate over each table in the database
Session = self.session()
with Session as s:
for table_name in inspector.get_table_names():
stmt = select(func.count()).select_from(text(table_name))
row_count = s.execute(stmt).scalar()
table_row_counts[table_name] = row_count
return {"rows": table_row_counts}
except Exception as e:
logger.critical(e)
raise SystemExit
def create_database(self, overwrite=False):
"""
Create the database and all the tables.
:return: True if successful.
"""
if overwrite== False:
if Path(self.path).is_file():
logger.warning(f"Database already exists at '{self.path}', overwrite=False")
logger.warning("Database creation cancelled.")
return False
try:
Base.metadata.create_all(self.engine())
new_version = Version(
id=1, created=datetime.utcnow(), updated=datetime.utcnow()
)
with self.session() as s:
s.add(new_version)
logger.info(f"Database created at {self.path}")
return True
except SQLAlchemyError as e:
logger.critical(e)
raise SystemExit
def drop_tables(self, confirm=False):
"""
Drop all tables stored in this metadata.
Conditional by default, will not attempt to drop tables not present in the target database.
:param confirm: Whether to confirm the table drop.
:return: True if successful.
"""
if confirm:
try:
Base.metadata.drop_all(self.engine())
logger.info(f"Database tables dropped at '{self.path}'")
return True
except SQLAlchemyError as e:
logger.critical(e)
raise SystemExit
else:
logger.warning("Database tables drop cancelled. You must confirm!")
return False
def delete_database(self, confirm=False):
"""
Delete the database file.
:param confirm: Whether to confirm the database deletion.
:return: True if successful.
"""
if confirm:
if not Path(self.path).exists():
return True
else:
# delete the file
try:
Path(self.path).unlink()
logger.info(f"Database deleted at '{self.path}'")
return True
except OSError as e:
logger.error(e)
raise SystemExit
else:
logger.warning(f"Database deletion cancelled. You must confirm!")
return False
def populate_database(self, data: List):
"""
Populate the database with data.
:param data: List of objects to populate the database.
:return: True if successful.
"""
try:
with self.session() as s:
s.add_all(data)
logger.info(f"Database populated")
return True
except SQLAlchemyError as e:
logger.error(e)
raise SystemExit
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment