Skip to content

Instantly share code, notes, and snippets.

@staab
Created February 7, 2020 23:12
Show Gist options
  • Save staab/64784f18e5a1f9117f4fd9f0861e7d80 to your computer and use it in GitHub Desktop.
Save staab/64784f18e5a1f9117f4fd9f0861e7d80 to your computer and use it in GitHub Desktop.
import re, importlib.util, os
from time import time
from utils.logging import logger
from utils.dt import Dt
from utils.misc import uuid_str
from lib.sql.query import clauses as c
logger = logger.getChild(__name__)
create_schema_q = c.sql("""
CREATE SCHEMA IF NOT EXISTS {}
""")
table_exists_q = """
SELECT 1 from pg_tables
WHERE tablename = 'migration'
AND schemaname = %(schema)s
"""
create_table_q = c.sql("""
CREATE TABLE {}."migration" (
"id" uuid NOT NULL CONSTRAINT migration_pk PRIMARY KEY,
"app_name" varchar(255) NOT NULL,
"created" timestamp NOT NULL,
"version" varchar(255) NOT NULL,
"number" smallint NOT NULL
)
""")
max_applied_q = c.sql("""
SELECT number
FROM {}.migration
WHERE app_name = %(app_name)s
ORDER BY number DESC
LIMIT 1
""")
add_migration_q = c.sql("""
INSERT INTO {}.migration (id, app_name, created, version, number)
VALUES (%(id)s, %(app_name)s, %(created)s, %(version)s, %(number)s)
""")
delete_obsolete_migrations_q = c.sql("""
DELETE FROM {}.migration WHERE number > %(number)s AND app_name = %(app_name)s
""")
delete_migration_q = c.sql("""
DELETE FROM {}.migration WHERE version = %(version)s AND app_name = %(app_name)s
""")
def migrate(schema, app_name, all_versions, db, db_version):
now = time()
schema_sql = c.field(schema)
# Make sure the given version is valid
max_version = len(all_versions) - 1
db_version = max(0, min(db_version, max_version))
# Make sure the schema exists
db.execute(create_schema_q.format(schema_sql))
with db.using(schema):
# Get our most recently applied migration (if any)
if db.scalar(table_exists_q, {'schema': schema}):
max_applied = db.scalar(max_applied_q.format(schema_sql), {
'app_name': app_name,
})
else:
db.execute(create_table_q.format(schema_sql))
max_applied = None
# If we recently squashed migrations, delete migrations > max
db.execute(delete_obsolete_migrations_q.format(schema_sql), {
'number': max_version,
'app_name': app_name,
})
# Figure out if we're going forward or backward or nowhere
if max_applied is None:
direction = 'forward'
versions = all_versions[0:db_version + 1]
elif db_version > max_applied:
direction = 'forward'
versions = all_versions[max_applied + 1:db_version + 1]
elif db_version < max_applied:
direction = 'backward'
versions = reversed(all_versions[db_version + 1:max_applied + 1])
raise ValueError(
'Backward migrations are not supported (tried to migrate from {} to {})'.format(
max_applied,
db_version
)
)
else:
logger.info("No migrations to apply for db {}".format(schema))
return
def exec_file(path, name):
# Execute straight-up sql
try:
with open('{}/{}.sql'.format(path, name)) as f:
db.execute(f.read())
except FileNotFoundError:
pass
# If there needs to be a python component, do that too
py_path = '{}/{}.py'.format(path, name)
if os.path.isfile(py_path):
spec = importlib.util.spec_from_file_location(name, py_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
module.run()
logger.info("Migrating {} db: {} -> {}".format(schema, max_applied, db_version))
# Apply all selected migrations
for version in versions:
name = version['name']
path = version['path']
if not re.match(r'^v\d{4}$', name):
raise ValueError('Badly named db version {}'.format(name))
# Run it and track it
with db.using(schema):
if direction == 'forward':
exec_file(path, 'forward')
db.execute(add_migration_q.format(schema_sql), {
'id': uuid_str(),
'app_name': app_name,
'created': Dt.now(),
'version': name,
'number': int(name[1:]),
})
if direction == 'backward':
exec_file(path, 'backward')
db.execute(delete_migration_q.format(schema_sql), {
'version': name,
'app_name': app_name,
})
elapsed = int(time() - now)
logger.info("Done applying migrations (took {} seconds)".format(elapsed))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment