Skip to content

Instantly share code, notes, and snippets.

@duck-nukem
Last active August 9, 2022 17:06
Show Gist options
  • Save duck-nukem/384f4b9d85787ff46ddf17e2a5d67a08 to your computer and use it in GitHub Desktop.
Save duck-nukem/384f4b9d85787ff46ddf17e2a5d67a08 to your computer and use it in GitHub Desktop.
import logging
import os
import re
from collections import defaultdict
from dataclasses import dataclass
from os import listdir
from os.path import isdir
from os.path import isfile
from os.path import splitext
from pathlib import Path
from time import time
from typing import Dict
from typing import List
from typing import NewType
from typing import Tuple
from typing import Union
import django
from django.db import connection
from django.db.migrations.loader import MigrationLoader
logger = logging.getLogger(__name__)
_NAME = 'ci_tool'
_APP_ROOT = Path(f'/opt/ci-tool/{_NAME}')
os.environ["DJANGO_SETTINGS_MODULE"] = f"{_NAME}.settings"
os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true"
_ANSI = {
'red': '\033[31m',
'green': '\033[32m',
'yellow': '\033[33m',
'blue': '\033[34m',
'bold': '\033[1m',
'reset': '\033[0m',
}
@dataclass
class MigrationFile:
path: Path
name: str
Operations = NewType(
"Operations",
Union[Dict[str, list], Dict[Tuple[str, str], Tuple[str, str]]],
)
def collect_operations() -> Dict[str, Operations]:
graph = MigrationLoader(connection, ignore_no_migrations=True).graph
ops = {
'to_remove': defaultdict(list),
'squashed': defaultdict(list),
'dependencies': {},
}
for migration in graph.nodes.values():
if len(migration.replaces) == 0:
continue
ops['squashed'][migration.app_label].append(migration.name)
logger.debug(f"Found squashed migration: {migration.name}")
for app_name, filename in migration.replaces:
logger.debug(f"Marking {filename} for removal")
logger.debug(
f"Marking {migration.name} as a dependency "
f"of {filename} in {app_name}"
)
ops['to_remove'][app_name].append(filename)
# noinspection PyTypeChecker
ops['dependencies'][(app_name, filename)] = \
(app_name, migration.name)
return ops
def clean_migrations(
path: Path,
operations: Operations,
):
migrations_dir = path / 'migrations'
is_migrated_app_directory = os.path.isdir(migrations_dir)
if is_migrated_app_directory:
logger.info(
f"🔍 {_ANSI['bold']}"
f"Migrations found in {migrations_dir}"
f"{_ANSI['reset']}",
)
app_name = path.name
ignored_files = {'__init__.py', '__pycache__'}
directory_entries = set(os.listdir(migrations_dir))
migration_files = [
f for f in directory_entries - ignored_files
if isfile(migrations_dir / f) and splitext(f)[1] == '.py'
]
logger.debug(f"Migration files: {migration_files}")
for filename in sorted(migration_files):
migration_file = MigrationFile(
path=migrations_dir / filename,
name=filename.split('.py')[0],
)
if migration_file.name in operations['to_remove'][app_name]:
_remove_file(migration_file)
continue # File was removed, no need to perform further checks
elif migration_file.name in operations['squashed'][app_name]:
_remove_replaces_section(migration_file)
_search_and_replace_obsolete_migrations(
migration_file,
operations['dependencies'],
)
for app_name in listdir(path):
if isdir(path / app_name):
clean_migrations(path / app_name, operations)
def _remove_file(migration_file: MigrationFile) -> None:
"""
Once a squashed migration has been applied on all environments, we can
remove the original migrations, as they are no longer needed.
"""
os.remove(migration_file.path)
logger.info(
f"\t\t❌ {_ANSI['red']}"
f"Deleting obsolete migration {migration_file.name}"
f"{_ANSI['reset']}"
)
def _remove_replaces_section(migration_file: MigrationFile) -> None:
"""
Once a squashed migration has been applied on all environments, we can
convert it to a regular migration, to enable further future squashing.
"""
with open(migration_file.path, 'r+') as f:
content = f.read()
# Matches the content between "replaces = [" and the first "]"
replaced_migrations_section = re.sub(
r'\t?replaces = \[.*?(?=])]',
'',
content,
flags=re.DOTALL,
)
logger.debug(
f"Removing replaces section from {migration_file.name}\n"
f"{replaced_migrations_section}"
)
_replace_file_contents(f, replaced_migrations_section)
logger.info(
f"\t\tℹ️ {_ANSI['blue']}"
f"Removed 'replaces' from: {migration_file.name}. "
f"It's no longer a squashed migration."
f"{_ANSI['reset']}",
)
def _search_and_replace_obsolete_migrations(
migration_file: MigrationFile,
migration_dependencies,
):
"""
For each migration, checks the "dependencies" section
and if a dependency name matches one of the recently removed
migration (it was in the "replaces" section of a squashed migration)
it will replace the dependency with the squashed migration name.
"""
with open(migration_file.path, 'r+') as f:
content = f.read()
has_changed = False
dependency_tuples = _parse_dependencies(content)
for dep in dependency_tuples:
if dep not in migration_dependencies:
continue
has_changed = True
content = content.replace(
str(dep),
str(migration_dependencies[dep]),
)
logger.info(
f"\t\t♻️ {_ANSI['green']}"
f"Updating dependency: {dep} -> "
f"{migration_dependencies[dep]} in "
f"{migration_file.name}"
f"{_ANSI['reset']}",
)
if not has_changed:
logger.debug(f"No changes found in {migration_file.name}")
return
_replace_file_contents(f, content)
def _parse_dependencies(content: str) -> List[Tuple[str, str]]:
"""
Parses the "dependencies" section of a migration file, then
uses the unsafe eval to convert them to Python tuples.
Be careful to only run it on migration files you trust to avoid security
issues.
"""
# Regex matches the content between "dependencies = [" and the first "]"
dependencies_regex_match = re.findall(
r'\t?dependencies = \[(.*?)(?=])]',
content,
flags=re.DOTALL,
)
clean_dependency_match = dependencies_regex_match[0].__str__() \
.replace("\\n", "") \
.replace("\n", "") \
.strip()
# Removes "migrations.SWAPPABLE_MODEL"
# it's safe to be ignored, but it'd raise errors during parsing
clean_dependency_match = re.sub(
'migrations.*?(,)',
'',
clean_dependency_match,
flags=re.DOTALL,
)
logger.debug(f"Cleaned dependencies: {clean_dependency_match}")
dependency_tuples = eval(f'[{clean_dependency_match}]')
return dependency_tuples
def _replace_file_contents(file, content: str) -> None:
"""
Deletes everything from the file and then fills it with the new content.
"""
logger.debug(f"Replacing file contents: {file.name}\n{content}")
file.truncate(0)
file.seek(0)
file.write(content)
def _configure_logging():
custom_formatter = logging.Formatter("%(message)s")
logging.getLogger().handlers[0].setFormatter(custom_formatter)
logging.getLogger().setLevel(logging.INFO)
if __name__ == '__main__':
start = time()
django.setup()
_configure_logging()
clean_migrations(Path(_APP_ROOT), operations=collect_operations())
end = time()
logger.info(
f"\n🎉 {_ANSI['green']}{_ANSI['bold']}"
f"Done in {(end - start):.2f} seconds!"
f"{_ANSI['reset']}",
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment