Skip to content

Instantly share code, notes, and snippets.

@dario61081
Last active January 26, 2023 16:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dario61081/d31637b845e9fc18867e9bf0cef835c3 to your computer and use it in GitHub Desktop.
Save dario61081/d31637b845e9fc18867e9bf0cef835c3 to your computer and use it in GitHub Desktop.
Database class for sqlalchemy database manager
import hashlib
from sqlalchemy import create_engine, MetaData, text
from sqlalchemy.exc import DatabaseError
from sqlalchemy.orm import sessionmaker, declarative_base
class Database:
def __init__(self, **kwargs):
self.username = kwargs.get('username', 'root')
self.password = kwargs.get('password', '')
self.host = kwargs.get('host', 'localhost')
self.port = kwargs.get('port', '3306')
self.database = kwargs.get('database', '')
self.driver = kwargs.get('driver', 'mysql+pymysql')
self.uri = f"{self.driver}://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}"
print(self.uri)
self.engine = create_engine(self.uri, poolclass=None)
self.cn = self.engine.connect()
self.metadata = MetaData(bind=self.engine)
# self.metadata.reflect(bind=self.engine)
def execute(self, query: str, **kwargs):
return self.cn.execute(text(query), **kwargs)
def fetch_all(self, query: str, **kwargs):
return self.execute(query, **kwargs).fetchall()
def fetch_one(self, query: str, **kwargs):
return self.execute(query, **kwargs).fetchone()
def fetch_scalar(self, query: str, **kwargs):
return self.execute(query, **kwargs).scalar()
def get_session(self):
return sessionmaker(self.engine)()
def secure_password(self, passwrd: str):
return hashlib.sha512(passwrd.encode()).hexdigest()
def create_schema(self, a_base: declarative_base):
a_base.metadata.create_all(bind=self.engine, checkfirst=True)
for class_item in a_base.__subclasses__():
if hasattr(class_item, "data"):
try:
print(f"found inicial data for {class_item}")
items = class_item().data()
s = self.get_session()
for row in items:
s.merge(row)
s.commit()
except DatabaseError:
continue
def drop_schema(self, a_base: declarative_base):
a_base.metadata.drop_all(bind=self.engine)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment