Last active
November 15, 2021 06:18
-
-
Save deeso/459bcd5221684f4f836402301f0aa48d to your computer and use it in GitHub Desktop.
read all mssql database schemas and generate code to for ORM [untested but mostly works]
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# requires pymssql, sqlalchemy | |
import sys | |
import argparse | |
import pymssql | |
import json | |
from datetime import datetime, timedelta | |
from sqlalchemy.engine import create_engine, URL | |
from sqlalchemy.orm import sessionmaker | |
from sqlalchemy.dialects import mssql | |
from sqlalchemy import Column | |
from sqlalchemy.ext.declarative import declarative_base | |
parser = argparse.ArgumentParser(description='Generate some ORM code from MSSQL Databases.') | |
parser.add_argument('-input', type=str, default=None, help='read previously enumerate ORM Data') | |
parser.add_argument('-output', type=str, default='generated_orm.py', help='write generated ORM code') | |
parser.add_argument('-enum', action="store_true", default=False, help='enumerate databases and tables from database') | |
parser.add_argument('-hostname', default=None, help='hostname with databases and tables enumerate from') | |
parser.add_argument('-username', default=None, help='username for account for database and tables enumerate from') | |
parser.add_argument('-password', default=None, help='password for account for database and tables enumerate from') | |
column_types = { | |
'bigint':mssql.BIGINT, | |
'binary':mssql.BINARY, | |
'bit':mssql.BIT, | |
'char':mssql.CHAR, | |
'date':mssql.DATE, | |
'datetime':mssql.DATETIME, | |
'datetime2':mssql.DATETIME2, | |
'datetimeoffset':mssql.DATETIMEOFFSET, | |
'decimal':mssql.DECIMAL, | |
'float':mssql.FLOAT, | |
'image':mssql.IMAGE, | |
'int':mssql.INTEGER, | |
'money':mssql.MONEY, | |
'nchar':mssql.NCHAR, | |
'ntext':mssql.NTEXT, | |
'numeric':mssql.NUMERIC, | |
'nvarchar':mssql.NVARCHAR, | |
'real':mssql.REAL, | |
'smalldatetime':mssql.SMALLDATETIME, | |
'smallint':mssql.SMALLINT, | |
'sql_variant':mssql.SQL_VARIANT, | |
'text':mssql.TEXT, | |
'time':mssql.TIME, | |
'tinyint':mssql.TINYINT, | |
'uniqueidentifier':mssql.UNIQUEIDENTIFIER, | |
'varbinary':mssql.VARBINARY, | |
'varchar':mssql.VARCHAR, | |
'xml':mssql.XML | |
} | |
column_types_names = { | |
'bigint':"mssql.BIGINT", | |
'binary':"mssql.BINARY", | |
'bit':"mssql.BIT", | |
'char':"mssql.CHAR", | |
'date':"mssql.DATE", | |
'datetime':"mssql.DATETIME", | |
'datetime2':"mssql.DATETIME2", | |
'datetimeoffset':"mssql.DATETIMEOFFSET", | |
'decimal':"mssql.DECIMAL", | |
'float':"mssql.FLOAT", | |
'image':"mssql.IMAGE", | |
'int':"mssql.INTEGER", | |
'money':"mssql.MONEY", | |
'nchar':"mssql.NCHAR", | |
'ntext':"mssql.NTEXT", | |
'numeric':"mssql.NUMERIC", | |
'nvarchar':"mssql.NVARCHAR", | |
'real':"mssql.REAL", | |
'smalldatetime':"mssql.SMALLDATETIME", | |
'smallint':"mssql.SMALLINT", | |
'sql_variant':"mssql.SQL_VARIANT", | |
'text':"mssql.TEXT", | |
'time':"mssql.TIME", | |
'tinyint':"mssql.TINYINT", | |
'uniqueidentifier':"mssql.UNIQUEIDENTIFIER", | |
'varbinary':"mssql.VARBINARY", | |
'varchar':"mssql.VARCHAR", | |
'xml':"mssql.XML" | |
} | |
# dynamically declarative the table code | |
DEF_PYTHON_FILE_HEADER = """from sqlalchemy.dialects import mssql | |
from sqlalchemy import Column | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.engine import create_engine, URL | |
from sqlalchemy.orm import sessionmaker | |
try: | |
from .config import get_config | |
except: | |
get_config = None | |
DB_HOSTNAME = 'DB_HOSTNAME' | |
DB_USERNAME = 'DB_USERNAME' | |
DB_PASSWORD = 'DB_PASSWORD' | |
column_types = { | |
'bigint':mssql.BIGINT, | |
'binary':mssql.BINARY, | |
'bit':mssql.BIT, | |
'char':mssql.CHAR, | |
'date':mssql.DATE, | |
'datetime':mssql.DATETIME, | |
'datetime2':mssql.DATETIME2, | |
'datetimeoffset':mssql.DATETIMEOFFSET, | |
'decimal':mssql.DECIMAL, | |
'float':mssql.FLOAT, | |
'image':mssql.IMAGE, | |
'int':mssql.INTEGER, | |
'money':mssql.MONEY, | |
'nchar':mssql.NCHAR, | |
'ntext':mssql.NTEXT, | |
'numeric':mssql.NUMERIC, | |
'nvarchar':mssql.NVARCHAR, | |
'real':mssql.REAL, | |
'smalldatetime':mssql.SMALLDATETIME, | |
'smallint':mssql.SMALLINT, | |
'sql_variant':mssql.SQL_VARIANT, | |
'text':mssql.TEXT, | |
'time':mssql.TIME, | |
'tinyint':mssql.TINYINT, | |
'uniqueidentifier':mssql.UNIQUEIDENTIFIER, | |
'varbinary':mssql.VARBINARY, | |
'varchar':mssql.VARCHAR, | |
'xml':mssql.XML | |
} | |
column_types_names = { | |
'bigint':"mssql.BIGINT", | |
'binary':"mssql.BINARY", | |
'bit':"mssql.BIT", | |
'char':"mssql.CHAR", | |
'date':"mssql.DATE", | |
'datetime':"mssql.DATETIME", | |
'datetime2':"mssql.DATETIME2", | |
'datetimeoffset':"mssql.DATETIMEOFFSET", | |
'decimal':"mssql.DECIMAL", | |
'float':"mssql.FLOAT", | |
'image':"mssql.IMAGE", | |
'int':"mssql.INTEGER", | |
'money':"mssql.MONEY", | |
'nchar':"mssql.NCHAR", | |
'ntext':"mssql.NTEXT", | |
'numeric':"mssql.NUMERIC", | |
'nvarchar':"mssql.NVARCHAR", | |
'real':"mssql.REAL", | |
'smalldatetime':"mssql.SMALLDATETIME", | |
'smallint':"mssql.SMALLINT", | |
'sql_variant':"mssql.SQL_VARIANT", | |
'text':"mssql.TEXT", | |
'time':"mssql.TIME", | |
'tinyint':"mssql.TINYINT", | |
'uniqueidentifier':"mssql.UNIQUEIDENTIFIER", | |
'varbinary':"mssql.VARBINARY", | |
'varchar':"mssql.VARCHAR", | |
'xml':"mssql.XML" | |
} | |
def get_engine_connection_string(database=None): | |
if get_config is not None: | |
config = get_config() | |
username = config.get('DB_USERNAME', None) | |
password = config.get('DB_PASSWORD', None) | |
hostname = config.get('DB_HOSTNAME', None) | |
else: | |
config = globals() | |
username = config.get('DB_USERNAME', None) | |
password = config.get('DB_PASSWORD', None) | |
hostname = config.get('DB_HOSTNAME', None) | |
return URL.create("mssql+pymssql", username=username, password=password, | |
host=hostname, database=database) | |
def get_engine(database=None): | |
connection_url = get_engine_connection_string(database=database) | |
engine = create_engine(connection_url) | |
engine.connect() | |
return engine | |
def get_session(database=None, readonly=True): | |
connection_url = get_engine_connection_string(database=database) | |
engine = create_engine(connection_url) | |
session = None | |
if readonly: | |
def abort_ro(*args,**kwargs): | |
raise Exception ("Read-only access only") | |
sm = sessionmaker(bind=engine, autoflush=False, autocommit=False) | |
session = sm() | |
session.flush = abort_ro | |
else: | |
sm = sessionmaker(bind=engine) | |
session = sm() | |
return session, engine | |
""" | |
DECL_DB_DESC = ''' | |
{database_name}_Base = declarative_base() | |
DB_{upper_database_name} = '{database_name}' | |
def get_{lower_database_name}_session(): | |
return get_session(database=DB_{upper_database_name}) | |
{classes} | |
''' | |
DECL_TABLE_DESC = '''class {safe_schema_name}({database_name}_Base): | |
__tablename__ = "{table_name}" | |
__tablename_args__ = {{"schema": '{schema_name}', "extend_existing":True}} | |
{columns} | |
@classmethod | |
def get_session(cls): | |
return get_{lower_database_name}_session() | |
''' | |
DECL_COLUMN_DESC = '''{column_name} = Column({column_def})''' | |
def create_python_classes(dbs_cols: dict, output_file: str): | |
''' | |
@db_cols: json representing a schema export mssql of the DB | |
@outputfile: python file to create | |
''' | |
decl_tables = {} | |
decl_class = {} | |
decl_db_tables_desc = {} | |
for db, info in db_cols.items(): | |
decl_db_tables_desc[db] = {} | |
decl_tables[db] = [] | |
delcared_tables = set() | |
for schema_name, tcolumns in info.items(): | |
schema = '.'.join(schema_name.split('.')[:-1]) | |
table = schema_name.split('.')[-1] | |
if table not in delcared_tables: | |
delcared_tables.add(table) | |
else: | |
continue | |
columns = [] | |
set_primary_key = False | |
for p, finfo in sorted(tcolumns.items(), key=lambda x: x[0]): | |
type = finfo['type'] | |
name = finfo['name'] | |
if name.find('__$') > -1: | |
continue | |
cls_name = column_types_names[type] | |
cls_value = '{}'.format(cls_name) | |
# if name == 'ID': | |
if p == "1" or not set_primary_key: | |
cls_value = '{}, primary_key=True'.format(cls_name) | |
if name[0].isdigit(): | |
name = '_' + name | |
if name.find(' ') > -1: | |
name = name.replace(' ', '_') | |
if name.find('#') > -1: | |
name = name.replace('#', '_') | |
if name.find('-') > -1: | |
name = name.replace('-', '_') | |
column_desc = DECL_COLUMN_DESC.format(column_name=name, column_def=cls_value) | |
columns.append(column_desc) | |
columns_decl = "\n ".join(columns) | |
decl_table_code = DECL_TABLE_DESC.format(lower_database_name=db.lower(), database_name=db, | |
safe_table_name=table.replace('-', '_').replace('#', '_'), | |
safe_schema_name=schema_name.replace('-', '_').replace('#', '_').replace('.', '_'), | |
table_name=table, | |
schema_name=schema, columns=columns_decl) | |
decl_db_tables_desc[db][table] = decl_table_code | |
decl_tables[db].append(decl_table_code) | |
if db in decl_tables: | |
classes = "\n\n".join(decl_tables[db]) | |
decl_class[db] = DECL_DB_DESC.format(classes=classes, database_name=db, | |
lower_database_name=db.lower(), upper_database_name=db.upper()) | |
python_out = open(output_file, 'w') | |
python_out.write(DEF_PYTHON_FILE_HEADER+'\n') | |
python_out.write('# Generated Python Class files for ORM\n') | |
for dbname in sorted(decl_class.keys()): | |
python_out.write(f'# Generated ORM for {dbname}\n') | |
python_out.write(decl_class[dbname]) | |
python_out.close() | |
return decl_class | |
def get_databases(username, password, hostname, engine=None): | |
m_engine = engine | |
if engine is None: | |
connection_url = get_engine_connection_string(username, password, hostname) | |
m_engine = create_engine(connection_url) | |
m_engine.connect() | |
databases = [i[0] for i in m_engine.execute("SELECT name FROM master.dbo.sysdatabases")] | |
if engine is None: | |
del m_engine | |
return databases | |
def get_database_schema(database, username, password, hostname): | |
engine = get_engine(username, password, hostname, database) | |
results = {} | |
cols = [i for i in engine.execute("SELECT * FROM INFORMATION_SCHEMA.COLUMNS")] | |
for e in cols: | |
dbo = ".".join(e[:3]) | |
field = e[3] | |
field_type = e[7] | |
order = e[4] | |
if dbo not in results: | |
results[dbo] = {} | |
results[dbo][order] = {'type':field_type, 'name':field} | |
del engine | |
return results | |
def get_database_columns(username, password, hostname, engine=None, databases=None, database=None): | |
m_engine = engine | |
if engine is None: | |
connection_url = get_engine_connection_string(username, password, hostname) | |
m_engine = create_engine(connection_url) | |
m_engine.connect() | |
if database is not None: | |
databases = [database] | |
results = {} | |
if databases is None: | |
databases = get_databases(username, password, hostname, engine) | |
for db in databases: | |
results[db] = get_database_schema(db, username, password, hostname) | |
if engine is None: | |
del m_engine | |
return results | |
def get_engine_connection_string(username, password, hostname, database=None): | |
return URL.create("mssql+pymssql", username=username, password=password, | |
host=hostname, database=database) | |
def get_engine(username, password, hostname, database=None): | |
connection_url = get_engine_connection_string(username, password, hostname, database) | |
engine = create_engine(connection_url) | |
engine.connect() | |
return engine | |
if __name__ == "__main__": | |
args = parser.parse_args() | |
db_cols = None | |
if args.enum and \ | |
args.hostname is not None and \ | |
args.username is not None and \ | |
args.password is not None: | |
db_cols = get_database_columns(args.username, args.password, args.hostname) | |
open(args.output.replace('.py', '')+'.json', 'w').write(json.dumps(db_cols, indent=4)) | |
elif args.input is not None: | |
db_cols = json.loads(open(args.input, 'r').read()) | |
if db_cols is not None: | |
x = create_python_classes(db_cols, args.output) | |
else: | |
raise Exception("Unable to create ORM code because the DB info is missing") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment