Skip to content

Instantly share code, notes, and snippets.

@deeso
Last active November 15, 2021 06:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save deeso/459bcd5221684f4f836402301f0aa48d to your computer and use it in GitHub Desktop.
Save deeso/459bcd5221684f4f836402301f0aa48d to your computer and use it in GitHub Desktop.
read all mssql database schemas and generate code to for ORM [untested but mostly works]
# requires pymssql, sqlalchemy
import sys
import argparse
import pymssql
import json
from datetime import datetime, timedelta
from sqlalchemy.engine import create_engine, URL
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects import mssql
from sqlalchemy import Column
from sqlalchemy.ext.declarative import declarative_base
parser = argparse.ArgumentParser(description='Generate some ORM code from MSSQL Databases.')
parser.add_argument('-input', type=str, default=None, help='read previously enumerate ORM Data')
parser.add_argument('-output', type=str, default='generated_orm.py', help='write generated ORM code')
parser.add_argument('-enum', action="store_true", default=False, help='enumerate databases and tables from database')
parser.add_argument('-hostname', default=None, help='hostname with databases and tables enumerate from')
parser.add_argument('-username', default=None, help='username for account for database and tables enumerate from')
parser.add_argument('-password', default=None, help='password for account for database and tables enumerate from')
column_types = {
'bigint':mssql.BIGINT,
'binary':mssql.BINARY,
'bit':mssql.BIT,
'char':mssql.CHAR,
'date':mssql.DATE,
'datetime':mssql.DATETIME,
'datetime2':mssql.DATETIME2,
'datetimeoffset':mssql.DATETIMEOFFSET,
'decimal':mssql.DECIMAL,
'float':mssql.FLOAT,
'image':mssql.IMAGE,
'int':mssql.INTEGER,
'money':mssql.MONEY,
'nchar':mssql.NCHAR,
'ntext':mssql.NTEXT,
'numeric':mssql.NUMERIC,
'nvarchar':mssql.NVARCHAR,
'real':mssql.REAL,
'smalldatetime':mssql.SMALLDATETIME,
'smallint':mssql.SMALLINT,
'sql_variant':mssql.SQL_VARIANT,
'text':mssql.TEXT,
'time':mssql.TIME,
'tinyint':mssql.TINYINT,
'uniqueidentifier':mssql.UNIQUEIDENTIFIER,
'varbinary':mssql.VARBINARY,
'varchar':mssql.VARCHAR,
'xml':mssql.XML
}
column_types_names = {
'bigint':"mssql.BIGINT",
'binary':"mssql.BINARY",
'bit':"mssql.BIT",
'char':"mssql.CHAR",
'date':"mssql.DATE",
'datetime':"mssql.DATETIME",
'datetime2':"mssql.DATETIME2",
'datetimeoffset':"mssql.DATETIMEOFFSET",
'decimal':"mssql.DECIMAL",
'float':"mssql.FLOAT",
'image':"mssql.IMAGE",
'int':"mssql.INTEGER",
'money':"mssql.MONEY",
'nchar':"mssql.NCHAR",
'ntext':"mssql.NTEXT",
'numeric':"mssql.NUMERIC",
'nvarchar':"mssql.NVARCHAR",
'real':"mssql.REAL",
'smalldatetime':"mssql.SMALLDATETIME",
'smallint':"mssql.SMALLINT",
'sql_variant':"mssql.SQL_VARIANT",
'text':"mssql.TEXT",
'time':"mssql.TIME",
'tinyint':"mssql.TINYINT",
'uniqueidentifier':"mssql.UNIQUEIDENTIFIER",
'varbinary':"mssql.VARBINARY",
'varchar':"mssql.VARCHAR",
'xml':"mssql.XML"
}
# dynamically declarative the table code
DEF_PYTHON_FILE_HEADER = """from sqlalchemy.dialects import mssql
from sqlalchemy import Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.engine import create_engine, URL
from sqlalchemy.orm import sessionmaker
try:
from .config import get_config
except:
get_config = None
DB_HOSTNAME = 'DB_HOSTNAME'
DB_USERNAME = 'DB_USERNAME'
DB_PASSWORD = 'DB_PASSWORD'
column_types = {
'bigint':mssql.BIGINT,
'binary':mssql.BINARY,
'bit':mssql.BIT,
'char':mssql.CHAR,
'date':mssql.DATE,
'datetime':mssql.DATETIME,
'datetime2':mssql.DATETIME2,
'datetimeoffset':mssql.DATETIMEOFFSET,
'decimal':mssql.DECIMAL,
'float':mssql.FLOAT,
'image':mssql.IMAGE,
'int':mssql.INTEGER,
'money':mssql.MONEY,
'nchar':mssql.NCHAR,
'ntext':mssql.NTEXT,
'numeric':mssql.NUMERIC,
'nvarchar':mssql.NVARCHAR,
'real':mssql.REAL,
'smalldatetime':mssql.SMALLDATETIME,
'smallint':mssql.SMALLINT,
'sql_variant':mssql.SQL_VARIANT,
'text':mssql.TEXT,
'time':mssql.TIME,
'tinyint':mssql.TINYINT,
'uniqueidentifier':mssql.UNIQUEIDENTIFIER,
'varbinary':mssql.VARBINARY,
'varchar':mssql.VARCHAR,
'xml':mssql.XML
}
column_types_names = {
'bigint':"mssql.BIGINT",
'binary':"mssql.BINARY",
'bit':"mssql.BIT",
'char':"mssql.CHAR",
'date':"mssql.DATE",
'datetime':"mssql.DATETIME",
'datetime2':"mssql.DATETIME2",
'datetimeoffset':"mssql.DATETIMEOFFSET",
'decimal':"mssql.DECIMAL",
'float':"mssql.FLOAT",
'image':"mssql.IMAGE",
'int':"mssql.INTEGER",
'money':"mssql.MONEY",
'nchar':"mssql.NCHAR",
'ntext':"mssql.NTEXT",
'numeric':"mssql.NUMERIC",
'nvarchar':"mssql.NVARCHAR",
'real':"mssql.REAL",
'smalldatetime':"mssql.SMALLDATETIME",
'smallint':"mssql.SMALLINT",
'sql_variant':"mssql.SQL_VARIANT",
'text':"mssql.TEXT",
'time':"mssql.TIME",
'tinyint':"mssql.TINYINT",
'uniqueidentifier':"mssql.UNIQUEIDENTIFIER",
'varbinary':"mssql.VARBINARY",
'varchar':"mssql.VARCHAR",
'xml':"mssql.XML"
}
def get_engine_connection_string(database=None):
if get_config is not None:
config = get_config()
username = config.get('DB_USERNAME', None)
password = config.get('DB_PASSWORD', None)
hostname = config.get('DB_HOSTNAME', None)
else:
config = globals()
username = config.get('DB_USERNAME', None)
password = config.get('DB_PASSWORD', None)
hostname = config.get('DB_HOSTNAME', None)
return URL.create("mssql+pymssql", username=username, password=password,
host=hostname, database=database)
def get_engine(database=None):
connection_url = get_engine_connection_string(database=database)
engine = create_engine(connection_url)
engine.connect()
return engine
def get_session(database=None, readonly=True):
connection_url = get_engine_connection_string(database=database)
engine = create_engine(connection_url)
session = None
if readonly:
def abort_ro(*args,**kwargs):
raise Exception ("Read-only access only")
sm = sessionmaker(bind=engine, autoflush=False, autocommit=False)
session = sm()
session.flush = abort_ro
else:
sm = sessionmaker(bind=engine)
session = sm()
return session, engine
"""
DECL_DB_DESC = '''
{database_name}_Base = declarative_base()
DB_{upper_database_name} = '{database_name}'
def get_{lower_database_name}_session():
return get_session(database=DB_{upper_database_name})
{classes}
'''
DECL_TABLE_DESC = '''class {safe_schema_name}({database_name}_Base):
__tablename__ = "{table_name}"
__tablename_args__ = {{"schema": '{schema_name}', "extend_existing":True}}
{columns}
@classmethod
def get_session(cls):
return get_{lower_database_name}_session()
'''
DECL_COLUMN_DESC = '''{column_name} = Column({column_def})'''
def create_python_classes(dbs_cols: dict, output_file: str):
'''
@db_cols: json representing a schema export mssql of the DB
@outputfile: python file to create
'''
decl_tables = {}
decl_class = {}
decl_db_tables_desc = {}
for db, info in db_cols.items():
decl_db_tables_desc[db] = {}
decl_tables[db] = []
delcared_tables = set()
for schema_name, tcolumns in info.items():
schema = '.'.join(schema_name.split('.')[:-1])
table = schema_name.split('.')[-1]
if table not in delcared_tables:
delcared_tables.add(table)
else:
continue
columns = []
set_primary_key = False
for p, finfo in sorted(tcolumns.items(), key=lambda x: x[0]):
type = finfo['type']
name = finfo['name']
if name.find('__$') > -1:
continue
cls_name = column_types_names[type]
cls_value = '{}'.format(cls_name)
# if name == 'ID':
if p == "1" or not set_primary_key:
cls_value = '{}, primary_key=True'.format(cls_name)
if name[0].isdigit():
name = '_' + name
if name.find(' ') > -1:
name = name.replace(' ', '_')
if name.find('#') > -1:
name = name.replace('#', '_')
if name.find('-') > -1:
name = name.replace('-', '_')
column_desc = DECL_COLUMN_DESC.format(column_name=name, column_def=cls_value)
columns.append(column_desc)
columns_decl = "\n ".join(columns)
decl_table_code = DECL_TABLE_DESC.format(lower_database_name=db.lower(), database_name=db,
safe_table_name=table.replace('-', '_').replace('#', '_'),
safe_schema_name=schema_name.replace('-', '_').replace('#', '_').replace('.', '_'),
table_name=table,
schema_name=schema, columns=columns_decl)
decl_db_tables_desc[db][table] = decl_table_code
decl_tables[db].append(decl_table_code)
if db in decl_tables:
classes = "\n\n".join(decl_tables[db])
decl_class[db] = DECL_DB_DESC.format(classes=classes, database_name=db,
lower_database_name=db.lower(), upper_database_name=db.upper())
python_out = open(output_file, 'w')
python_out.write(DEF_PYTHON_FILE_HEADER+'\n')
python_out.write('# Generated Python Class files for ORM\n')
for dbname in sorted(decl_class.keys()):
python_out.write(f'# Generated ORM for {dbname}\n')
python_out.write(decl_class[dbname])
python_out.close()
return decl_class
def get_databases(username, password, hostname, engine=None):
m_engine = engine
if engine is None:
connection_url = get_engine_connection_string(username, password, hostname)
m_engine = create_engine(connection_url)
m_engine.connect()
databases = [i[0] for i in m_engine.execute("SELECT name FROM master.dbo.sysdatabases")]
if engine is None:
del m_engine
return databases
def get_database_schema(database, username, password, hostname):
engine = get_engine(username, password, hostname, database)
results = {}
cols = [i for i in engine.execute("SELECT * FROM INFORMATION_SCHEMA.COLUMNS")]
for e in cols:
dbo = ".".join(e[:3])
field = e[3]
field_type = e[7]
order = e[4]
if dbo not in results:
results[dbo] = {}
results[dbo][order] = {'type':field_type, 'name':field}
del engine
return results
def get_database_columns(username, password, hostname, engine=None, databases=None, database=None):
m_engine = engine
if engine is None:
connection_url = get_engine_connection_string(username, password, hostname)
m_engine = create_engine(connection_url)
m_engine.connect()
if database is not None:
databases = [database]
results = {}
if databases is None:
databases = get_databases(username, password, hostname, engine)
for db in databases:
results[db] = get_database_schema(db, username, password, hostname)
if engine is None:
del m_engine
return results
def get_engine_connection_string(username, password, hostname, database=None):
return URL.create("mssql+pymssql", username=username, password=password,
host=hostname, database=database)
def get_engine(username, password, hostname, database=None):
connection_url = get_engine_connection_string(username, password, hostname, database)
engine = create_engine(connection_url)
engine.connect()
return engine
if __name__ == "__main__":
args = parser.parse_args()
db_cols = None
if args.enum and \
args.hostname is not None and \
args.username is not None and \
args.password is not None:
db_cols = get_database_columns(args.username, args.password, args.hostname)
open(args.output.replace('.py', '')+'.json', 'w').write(json.dumps(db_cols, indent=4))
elif args.input is not None:
db_cols = json.loads(open(args.input, 'r').read())
if db_cols is not None:
x = create_python_classes(db_cols, args.output)
else:
raise Exception("Unable to create ORM code because the DB info is missing")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment