Last active
August 19, 2022 08:53
-
-
Save duck-nukem/1e2b4fc177039eb0c5f8c8c3fc0e4c95 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
from collections import defaultdict | |
from dataclasses import dataclass | |
from pathlib import Path | |
from time import time | |
from typing import Dict | |
from typing import List | |
import django | |
from django.db import connection | |
from django.db.migrations import Migration | |
from django.db.migrations import RunPython | |
from django.db.migrations import RunSQL | |
from django.db.migrations.loader import MigrationLoader | |
logger = logging.getLogger(__name__) | |
_NAME = 'ci_tool' | |
_APP_ROOT = Path(f'/opt/ci-tool/{_NAME}') | |
os.environ["DJANGO_SETTINGS_MODULE"] = f"{_NAME}.settings" | |
os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" | |
_ANSI = { | |
'red': '\033[31m', | |
'green': '\033[32m', | |
'yellow': '\033[33m', | |
'blue': '\033[34m', | |
'bold': '\033[1m', | |
'reset': '\033[0m', | |
} | |
@dataclass | |
class MigrationFile: | |
path: Path | |
name: str | |
def sort_migrations(migrations): | |
return sorted(migrations, key=lambda m: int(m.name[0:4])) | |
def squash_migrations() -> Dict[str, List[str]]: | |
graph = MigrationLoader(connection, ignore_no_migrations=True).graph | |
app_labels = {app_label for app_label, _ in graph.nodes.keys()} | |
squashable = defaultdict(list) | |
for app_label in app_labels: | |
migrations = [ | |
m for m in graph.nodes.values() if | |
m.app_label == app_label | |
] | |
migrations = reversed(sort_migrations(migrations)) | |
for migration in migrations: | |
app_dependencies = [ | |
dependency_app_label == app_label | |
for dependency_app_label, _ in migration.dependencies | |
] | |
are_all_dependencies_internal = all(app_dependencies) | |
is_squashed_migration = len(migration.replaces) > 0 | |
is_data_migration = any([ | |
isinstance(o, RunPython) or isinstance(o, RunSQL) | |
for o in migration.operations | |
]) | |
is_squashable = ( | |
are_all_dependencies_internal | |
and not is_data_migration | |
and not is_squashed_migration | |
) | |
migration_identifier = f'{app_label} ~> {migration.name}' | |
if is_squashable: | |
print( | |
f'{_ANSI["green"]}{migration_identifier} is squashable') | |
squashable[app_label].append(migration) | |
elif len(squashable[app_label]) > 1: | |
print( | |
f'{_ANSI["red"]}{migration_identifier} is not squashable.\n' | |
f'{_ANSI["green"]}committing {len(squashable[app_label])} ' | |
f'migrations for squashing' | |
) | |
_delegate_squash_migration_to_system_call( | |
app_label, | |
sort_migrations(squashable[app_label]), | |
) | |
squashable.pop(app_label) | |
else: | |
print(f'{_ANSI["red"]}{migration_identifier} is not squashable') | |
squashable.pop(app_label) | |
return squashable | |
def _configure_logging(): | |
custom_formatter = logging.Formatter("%(message)s") | |
logging.getLogger().handlers[0].setFormatter(custom_formatter) | |
logging.getLogger().setLevel(logging.INFO) | |
def _delegate_squash_migration_to_system_call( | |
app_label: str, | |
migrations: List[Migration], | |
): | |
os.system( | |
f'python manage.py squashmigrations --noinput --no-header ' | |
f'{app_label} {migrations[0].name} {migrations[-1].name}' | |
) | |
if __name__ == '__main__': | |
start = time() | |
django.setup() | |
_configure_logging() | |
squashable_migrations = squash_migrations() | |
end = time() | |
logger.info( | |
f"\n🎉 {_ANSI['green']}{_ANSI['bold']}" | |
f"Done in {(end - start):.2f} seconds!" | |
f"{_ANSI['reset']}", | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment