Created
September 12, 2018 11:52
-
-
Save itdependsnetworks/0871d54c1afe4765ffc91aab3c452042 to your computer and use it in GitHub Desktop.
Example of creating custom fields in sqlalchemy.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.orm import Session, relationship, backref, sessionmaker | |
from sqlalchemy import create_engine, Column, String, Integer, ForeignKey, Boolean | |
from sqlalchemy import MetaData | |
from sqlalchemy.orm import create_session | |
from sqlalchemy.ext.automap import automap_base | |
import random, string | |
def randomword(length): | |
letters = string.ascii_uppercase | |
return ''.join(random.choice(letters) for i in range(length)) | |
def get_class_by_tablename(tablename): | |
"""Return class reference mapped to table. | |
:param tablename: String with name of table. | |
:return: Class reference or None. | |
""" | |
for c in Base._decl_class_registry.values(): | |
if hasattr(c, '__tablename__') and c.__tablename__ == tablename: | |
return c | |
def to_tuple(class_obj, custom_list): | |
my_tuple = () | |
for item in custom_list: | |
my_tuple = my_tuple + (getattr(class_obj, item),) | |
return my_tuple | |
def create_column(column, engine, actual): | |
class_obj = get_class_by_tablename(column.table_name) | |
if column.column_type == 'String': | |
column_obj = Column(String(50), nullable=True) | |
alter_name = 'VARCHAR' | |
elif column.column_type == 'Integer': | |
column_obj = Column(Integer, nullable=True) | |
alter_name = 'INTEGER' | |
if not hasattr(class_obj, column.column_name): | |
setattr(class_obj, column.column_name, column_obj) | |
actual_table = Actual.classes.get(column.table_name) | |
if not hasattr(actual_table, column.column_name): | |
engine.execute('ALTER TABLE {} ADD {} {}'.format(column.table_name, column.column_name, alter_name)) | |
def get_active_columns(cust_cols, engine, actual): | |
active_custom_columns = {} | |
for custom_column in cust_cols: | |
create_column(custom_column, engine, Actual) | |
if not active_custom_columns.get(custom_column.table_name): | |
active_custom_columns[custom_column.table_name] = [] | |
active_custom_columns[custom_column.table_name].append(custom_column.column_name) | |
return active_custom_columns | |
def precheck(engine): | |
TempBase.metadata.create_all(engine) | |
presession = sessionmaker(bind=engine) | |
ses = presession() | |
return ses.query(TempCustomFields).all() | |
TempBase = declarative_base() | |
class TempCustomFields(TempBase): | |
__tablename__ = 'custom_fields' | |
custom_field_id = Column(Integer, primary_key=True, nullable=False) | |
table_name = Column(String(50), nullable=False) | |
column_name = Column(String(50), nullable=False) | |
column_type = Column(String(50), nullable=False) | |
column_nullable = Column(Boolean, nullable=False, default=True) | |
engine = create_engine('sqlite:///custom_fields.db') | |
engine.execute('pragma foreign_keys=on') | |
cust_cols = precheck(engine) | |
Actual = automap_base() | |
Actual.prepare(engine, reflect=True) | |
session = create_session(bind=engine) | |
Base = declarative_base() | |
class CustomFields(Base): | |
__tablename__ = 'custom_fields' | |
custom_field_id = Column(Integer, primary_key=True, nullable=False) | |
table_name = Column(String(50), nullable=False) | |
column_name = Column(String(50), nullable=False) | |
column_type = Column(String(50), nullable=False) | |
column_nullable = Column(Boolean, nullable=False, default=True) | |
class State(Base): | |
__tablename__ = 'state' | |
state_id = Column(Integer, primary_key=True, nullable=False) | |
name = Column(String(50), nullable=False) | |
active_custom_columns = get_active_columns(cust_cols, engine, Actual) | |
engine = create_engine('sqlite:///custom_fields.db') | |
engine.execute('pragma foreign_keys=on') | |
Base.metadata.create_all(engine) | |
session = sessionmaker(bind=engine) | |
s = session() | |
if not hasattr(State, 'region'): | |
region = CustomFields(table_name="state", column_name='region', column_type='String', column_nullable=True) | |
s.add(region) | |
new_state = State(name=randomword(2)) | |
s.add(new_state) | |
elif not hasattr(State, 'postal_code'): | |
postal_code = CustomFields(table_name="state", column_name='postal_code', column_type='Integer', column_nullable=True) | |
s.add(postal_code) | |
new_state = State(name=randomword(2), region='North') | |
s.add(new_state) | |
else: | |
new_state = State(name=randomword(2), region='North', postal_code=7) | |
s.add(new_state) | |
s.commit() | |
for state in s.query(State).all(): | |
state_string = str("name: {}".format(state.name)) | |
for cust_col in active_custom_columns.get(State.__tablename__, []): | |
state_string = str("{}, {}: {}".format(state_string, cust_col, getattr(state, cust_col))) | |
print state_string | |
print "------------------" | |
tuple_creator = to_tuple(State, active_custom_columns.get(State.__tablename__, [])) | |
for row in s.query(State.name, *tuple_creator).all(): | |
state_string = str("name: {}".format(row.name)) | |
for cust_col in active_custom_columns.get(State.__tablename__, []): | |
state_string = str("{}, {}: {}".format(state_string, cust_col, getattr(row, cust_col))) | |
print state_string |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment