Skip to content

Instantly share code, notes, and snippets.

@itdependsnetworks
Created September 12, 2018 11:52
Show Gist options
  • Save itdependsnetworks/0871d54c1afe4765ffc91aab3c452042 to your computer and use it in GitHub Desktop.
Save itdependsnetworks/0871d54c1afe4765ffc91aab3c452042 to your computer and use it in GitHub Desktop.
Example of creating custom fields in sqlalchemy.
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session, relationship, backref, sessionmaker
from sqlalchemy import create_engine, Column, String, Integer, ForeignKey, Boolean
from sqlalchemy import MetaData
from sqlalchemy.orm import create_session
from sqlalchemy.ext.automap import automap_base
import random, string
def randomword(length):
letters = string.ascii_uppercase
return ''.join(random.choice(letters) for i in range(length))
def get_class_by_tablename(tablename):
"""Return class reference mapped to table.
:param tablename: String with name of table.
:return: Class reference or None.
"""
for c in Base._decl_class_registry.values():
if hasattr(c, '__tablename__') and c.__tablename__ == tablename:
return c
def to_tuple(class_obj, custom_list):
my_tuple = ()
for item in custom_list:
my_tuple = my_tuple + (getattr(class_obj, item),)
return my_tuple
def create_column(column, engine, actual):
class_obj = get_class_by_tablename(column.table_name)
if column.column_type == 'String':
column_obj = Column(String(50), nullable=True)
alter_name = 'VARCHAR'
elif column.column_type == 'Integer':
column_obj = Column(Integer, nullable=True)
alter_name = 'INTEGER'
if not hasattr(class_obj, column.column_name):
setattr(class_obj, column.column_name, column_obj)
actual_table = Actual.classes.get(column.table_name)
if not hasattr(actual_table, column.column_name):
engine.execute('ALTER TABLE {} ADD {} {}'.format(column.table_name, column.column_name, alter_name))
def get_active_columns(cust_cols, engine, actual):
active_custom_columns = {}
for custom_column in cust_cols:
create_column(custom_column, engine, Actual)
if not active_custom_columns.get(custom_column.table_name):
active_custom_columns[custom_column.table_name] = []
active_custom_columns[custom_column.table_name].append(custom_column.column_name)
return active_custom_columns
def precheck(engine):
TempBase.metadata.create_all(engine)
presession = sessionmaker(bind=engine)
ses = presession()
return ses.query(TempCustomFields).all()
TempBase = declarative_base()
class TempCustomFields(TempBase):
__tablename__ = 'custom_fields'
custom_field_id = Column(Integer, primary_key=True, nullable=False)
table_name = Column(String(50), nullable=False)
column_name = Column(String(50), nullable=False)
column_type = Column(String(50), nullable=False)
column_nullable = Column(Boolean, nullable=False, default=True)
engine = create_engine('sqlite:///custom_fields.db')
engine.execute('pragma foreign_keys=on')
cust_cols = precheck(engine)
Actual = automap_base()
Actual.prepare(engine, reflect=True)
session = create_session(bind=engine)
Base = declarative_base()
class CustomFields(Base):
__tablename__ = 'custom_fields'
custom_field_id = Column(Integer, primary_key=True, nullable=False)
table_name = Column(String(50), nullable=False)
column_name = Column(String(50), nullable=False)
column_type = Column(String(50), nullable=False)
column_nullable = Column(Boolean, nullable=False, default=True)
class State(Base):
__tablename__ = 'state'
state_id = Column(Integer, primary_key=True, nullable=False)
name = Column(String(50), nullable=False)
active_custom_columns = get_active_columns(cust_cols, engine, Actual)
engine = create_engine('sqlite:///custom_fields.db')
engine.execute('pragma foreign_keys=on')
Base.metadata.create_all(engine)
session = sessionmaker(bind=engine)
s = session()
if not hasattr(State, 'region'):
region = CustomFields(table_name="state", column_name='region', column_type='String', column_nullable=True)
s.add(region)
new_state = State(name=randomword(2))
s.add(new_state)
elif not hasattr(State, 'postal_code'):
postal_code = CustomFields(table_name="state", column_name='postal_code', column_type='Integer', column_nullable=True)
s.add(postal_code)
new_state = State(name=randomword(2), region='North')
s.add(new_state)
else:
new_state = State(name=randomword(2), region='North', postal_code=7)
s.add(new_state)
s.commit()
for state in s.query(State).all():
state_string = str("name: {}".format(state.name))
for cust_col in active_custom_columns.get(State.__tablename__, []):
state_string = str("{}, {}: {}".format(state_string, cust_col, getattr(state, cust_col)))
print state_string
print "------------------"
tuple_creator = to_tuple(State, active_custom_columns.get(State.__tablename__, []))
for row in s.query(State.name, *tuple_creator).all():
state_string = str("name: {}".format(row.name))
for cust_col in active_custom_columns.get(State.__tablename__, []):
state_string = str("{}, {}: {}".format(state_string, cust_col, getattr(row, cust_col)))
print state_string
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment