Skip to content

Instantly share code, notes, and snippets.

@getadeo
Forked from spinningD20/alembic.ini
Created March 9, 2016 14:18
Show Gist options
  • Save getadeo/35f4c38302c43e7cf233 to your computer and use it in GitHub Desktop.
Save getadeo/35f4c38302c43e7cf233 to your computer and use it in GitHub Desktop.
migrate include_objects problem
[excluded]
tables = ProjectInfo, AddressTypes, AddressTypes_tracking, ClientAddresses_tracking, ClientContacts, ClientContacts_tracking, ClientTypes, ClientAddresses, Clients, Clients_tracking, ContactGroups, ContactGroups_tracking, Contacts, Contacts_tracking
from __future__ import with_statement
from alembic import context
from sqlalchemy import engine_from_config, pool
from logging.config import fileConfig
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
from flask import current_app
config.set_main_option('sqlalchemy.url', current_app.config.get('SQLALCHEMY_DATABASE_URI'))
target_metadata = current_app.extensions['migrate'].db.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def exclude_tables_from_config(config_):
tables_ = config_.get("tables", None)
if tables_ is not None:
tables = tables_.split(", ")
return tables
else:
return []
exclude_tables = exclude_tables_from_config(config.get_section('excluded'))
def include_object(object, name, type_, reflected, compare_to):
if type_ == "unique_constraint":
print('+++++++++++++++++',dir(object))
if type_ == "table" and name in exclude_tables:
print('-----------------excluding----------------', name)
return False
else:
return True
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url,
include_object = include_object)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
engine = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool)
connection = engine.connect()
context.configure(
connection=connection,
target_metadata=target_metadata,
include_object = include_object
)
try:
with context.begin_transaction():
context.run_migrations()
finally:
connection.close()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
class Project(Model):
__tablename__ = 'ProjectInfo'
__bind_key__ = 'third_party_db'
Id = Column(UNIQUEIDENTIFIER, primary_key=True)
ClientId = Column(UNIQUEIDENTIFIER, nullable=False, index=True)
Client = Column(Unicode(150), nullable=False, index=True)
Name = Column(Unicode(150), nullable=False)
Filename = Column(Unicode(160), nullable=False)
Folder = Column(Unicode(250), nullable=False, unique=True)
CheckOutMachine = Column(Unicode(50), nullable=False, server_default=text("('')"))
CheckOutUserName = Column(Unicode(100), nullable=False, server_default=text("('')"))
CheckInDateTime = Column(DateTime, nullable=False)
Deleted = Column(BIT, nullable=False, server_default=text("((0))"))
Archived = Column(BIT, nullable=False, server_default=text("((0))"))
Number = Column(Unicode(150))
Progress = Column(Unicode(50), index=True)
TotalPrice = Column(MONEY, nullable=False, server_default=text("((0))"))
Revision = Column(SmallInteger, nullable=False, server_default=text("((0))"))
StartDate = Column(DateTime)
EndDate = Column(DateTime)
Resources = Column(Unicode(2500))
CreatedOn = Column(DateTime, nullable=False)
UpdatedOn = Column(DateTime, nullable=False)
Notes = Column(Unicode(2000))
AcctgEstNumber = Column(Unicode(500))
SIClientId = Column(UNIQUEIDENTIFIER, nullable=False, index=True)
AssignedToId = Column(UNIQUEIDENTIFIER, index=True)
AssignedTo = Column(Unicode(150))
CustomField1 = Column(Unicode(300))
CustomField2 = Column(Unicode(300))
CustomField3 = Column(Unicode(300))
CustomField4 = Column(Unicode(300))
CustomField5 = Column(Unicode(300))
CustomField6 = Column(Unicode(300))
CustomField7 = Column(Unicode(300))
CustomField8 = Column(Unicode(300))
CustomField9 = Column(Unicode(1000))
CustomField10 = Column(Unicode(1000))
worksheets = relationship('Worksheet')
tasks = relationship('Task')
times = relationship('TaskTime')
parts = relationship('Part')
class Worksheet(Model):
__tablename__ = 'worksheets'
id = Column(Integer, primary_key=True)
manager = Column(Integer, ForeignKey('users.id'))
type = Column(String(10))
project_id = Column(UNIQUEIDENTIFIER, ForeignKey('ProjectInfo.Id'))
date = Column(DateTime())
billable = Column(Boolean)
completed = Column(Boolean)
title = Column(String(500))
notes = Column(String(1000))
tasks = relationship('WorksheetTaskMapping')
assignees = relationship('WorksheetAssignment')
project = relationship('Project')
def __init__(self, manager=None, type='standard', project_id=None, date=None, billable=True, completed=False, title='', notes=''):
self.manager = manager
self.type = type
self.project_id = project_id
self.date = date
self.billable = billable
self.completed = completed
self.title = title
self.notes = notes
def assign_to(self, assignee):
existing = WorksheetAssignment.query.filter_by(sheet_id=self.id).filter_by(user_id=assignee).first()
if existing:
print('existing assignment', existing)
pass
else:
print('adding new assignment')
assignment = WorksheetAssignment()
assignment.user_id = assignee
assignment.sheet_id = self.id
db.session.add(assignment)
db.session.commit()
def unassign_from(self, assignee):
existing = Worksheet.query.filter_by(id=self.id).filter_by(user_id=assignee).first()
if existing:
db.session.remove(existing)
db.session.commit()
else:
pass
def assign_tasks(self, task_list):
if len(self.tasks) > 0:
tasks_to_assign = [task for task in task_list if task not in self.tasks]
task_list = tasks_to_assign
else:
print('adding new assignment')
mapping = WorksheetTaskMapping()
mapping.task_id = self.id
mapping.sheet_id = self.id
db.session.add(mapping)
db.session.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment