Skip to content

Instantly share code, notes, and snippets.

@spinningD20
Last active March 9, 2016 14:18
Show Gist options
  • Save spinningD20/b49fcc41cdce7519a358 to your computer and use it in GitHub Desktop.
Save spinningD20/b49fcc41cdce7519a358 to your computer and use it in GitHub Desktop.
migrate include_objects problem
[excluded]
tables = ProjectInfo, AddressTypes, AddressTypes_tracking, ClientAddresses_tracking, ClientContacts, ClientContacts_tracking, ClientTypes, ClientAddresses, Clients, Clients_tracking, ContactGroups, ContactGroups_tracking, Contacts, Contacts_tracking
from __future__ import with_statement
from alembic import context
from sqlalchemy import engine_from_config, pool
from logging.config import fileConfig
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
from flask import current_app
config.set_main_option('sqlalchemy.url', current_app.config.get('SQLALCHEMY_DATABASE_URI'))
target_metadata = current_app.extensions['migrate'].db.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def exclude_tables_from_config(config_):
tables_ = config_.get("tables", None)
if tables_ is not None:
tables = tables_.split(", ")
return tables
else:
return []
exclude_tables = exclude_tables_from_config(config.get_section('excluded'))
def include_object(object, name, type_, reflected, compare_to):
if type_ == "unique_constraint":
print('+++++++++++++++++',dir(object))
if type_ == "table" and name in exclude_tables:
print('-----------------excluding----------------', name)
return False
else:
return True
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url,
include_object = include_object)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
engine = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool)
connection = engine.connect()
context.configure(
connection=connection,
target_metadata=target_metadata,
include_object = include_object
)
try:
with context.begin_transaction():
context.run_migrations()
finally:
connection.close()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
class Project(Model):
__tablename__ = 'ProjectInfo'
__bind_key__ = 'third_party_db'
Id = Column(UNIQUEIDENTIFIER, primary_key=True)
ClientId = Column(UNIQUEIDENTIFIER, nullable=False, index=True)
Client = Column(Unicode(150), nullable=False, index=True)
Name = Column(Unicode(150), nullable=False)
Filename = Column(Unicode(160), nullable=False)
Folder = Column(Unicode(250), nullable=False, unique=True)
CheckOutMachine = Column(Unicode(50), nullable=False, server_default=text("('')"))
CheckOutUserName = Column(Unicode(100), nullable=False, server_default=text("('')"))
CheckInDateTime = Column(DateTime, nullable=False)
Deleted = Column(BIT, nullable=False, server_default=text("((0))"))
Archived = Column(BIT, nullable=False, server_default=text("((0))"))
Number = Column(Unicode(150))
Progress = Column(Unicode(50), index=True)
TotalPrice = Column(MONEY, nullable=False, server_default=text("((0))"))
Revision = Column(SmallInteger, nullable=False, server_default=text("((0))"))
StartDate = Column(DateTime)
EndDate = Column(DateTime)
Resources = Column(Unicode(2500))
CreatedOn = Column(DateTime, nullable=False)
UpdatedOn = Column(DateTime, nullable=False)
Notes = Column(Unicode(2000))
AcctgEstNumber = Column(Unicode(500))
SIClientId = Column(UNIQUEIDENTIFIER, nullable=False, index=True)
AssignedToId = Column(UNIQUEIDENTIFIER, index=True)
AssignedTo = Column(Unicode(150))
CustomField1 = Column(Unicode(300))
CustomField2 = Column(Unicode(300))
CustomField3 = Column(Unicode(300))
CustomField4 = Column(Unicode(300))
CustomField5 = Column(Unicode(300))
CustomField6 = Column(Unicode(300))
CustomField7 = Column(Unicode(300))
CustomField8 = Column(Unicode(300))
CustomField9 = Column(Unicode(1000))
CustomField10 = Column(Unicode(1000))
worksheets = relationship('Worksheet')
tasks = relationship('Task')
times = relationship('TaskTime')
parts = relationship('Part')
class Worksheet(Model):
__tablename__ = 'worksheets'
id = Column(Integer, primary_key=True)
manager = Column(Integer, ForeignKey('users.id'))
type = Column(String(10))
project_id = Column(UNIQUEIDENTIFIER, ForeignKey('ProjectInfo.Id'))
date = Column(DateTime())
billable = Column(Boolean)
completed = Column(Boolean)
title = Column(String(500))
notes = Column(String(1000))
tasks = relationship('WorksheetTaskMapping')
assignees = relationship('WorksheetAssignment')
project = relationship('Project')
def __init__(self, manager=None, type='standard', project_id=None, date=None, billable=True, completed=False, title='', notes=''):
self.manager = manager
self.type = type
self.project_id = project_id
self.date = date
self.billable = billable
self.completed = completed
self.title = title
self.notes = notes
def assign_to(self, assignee):
existing = WorksheetAssignment.query.filter_by(sheet_id=self.id).filter_by(user_id=assignee).first()
if existing:
print('existing assignment', existing)
pass
else:
print('adding new assignment')
assignment = WorksheetAssignment()
assignment.user_id = assignee
assignment.sheet_id = self.id
db.session.add(assignment)
db.session.commit()
def unassign_from(self, assignee):
existing = Worksheet.query.filter_by(id=self.id).filter_by(user_id=assignee).first()
if existing:
db.session.remove(existing)
db.session.commit()
else:
pass
def assign_tasks(self, task_list):
if len(self.tasks) > 0:
tasks_to_assign = [task for task in task_list if task not in self.tasks]
task_list = tasks_to_assign
else:
print('adding new assignment')
mapping = WorksheetTaskMapping()
mapping.task_id = self.id
mapping.sheet_id = self.id
db.session.add(mapping)
db.session.commit()
@spinningD20
Copy link
Author

obviously removed a lot of irrelevant stuff, but there's still plenty of irrelevant / unfinished methods, etc. Ignore those

@getadeo
Copy link

getadeo commented Mar 9, 2016

Hi, Thanks for this. Big help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment