-
-
Save duck-nukem/384f4b9d85787ff46ddf17e2a5d67a08 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
import re | |
from collections import defaultdict | |
from dataclasses import dataclass | |
from os import listdir | |
from os.path import isdir | |
from os.path import isfile | |
from os.path import splitext | |
from pathlib import Path | |
from time import time | |
from typing import Dict | |
from typing import List | |
from typing import NewType | |
from typing import Tuple | |
from typing import Union | |
import django | |
from django.db import connection | |
from django.db.migrations.loader import MigrationLoader | |
logger = logging.getLogger(__name__) | |
_NAME = 'ci_tool' | |
_APP_ROOT = Path(f'/opt/ci-tool/{_NAME}') | |
os.environ["DJANGO_SETTINGS_MODULE"] = f"{_NAME}.settings" | |
os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" | |
_ANSI = { | |
'red': '\033[31m', | |
'green': '\033[32m', | |
'yellow': '\033[33m', | |
'blue': '\033[34m', | |
'bold': '\033[1m', | |
'reset': '\033[0m', | |
} | |
@dataclass | |
class MigrationFile: | |
path: Path | |
name: str | |
Operations = NewType( | |
"Operations", | |
Union[Dict[str, list], Dict[Tuple[str, str], Tuple[str, str]]], | |
) | |
def collect_operations() -> Dict[str, Operations]: | |
graph = MigrationLoader(connection, ignore_no_migrations=True).graph | |
ops = { | |
'to_remove': defaultdict(list), | |
'squashed': defaultdict(list), | |
'dependencies': {}, | |
} | |
for migration in graph.nodes.values(): | |
if len(migration.replaces) == 0: | |
continue | |
ops['squashed'][migration.app_label].append(migration.name) | |
logger.debug(f"Found squashed migration: {migration.name}") | |
for app_name, filename in migration.replaces: | |
logger.debug(f"Marking {filename} for removal") | |
logger.debug( | |
f"Marking {migration.name} as a dependency " | |
f"of {filename} in {app_name}" | |
) | |
ops['to_remove'][app_name].append(filename) | |
# noinspection PyTypeChecker | |
ops['dependencies'][(app_name, filename)] = \ | |
(app_name, migration.name) | |
return ops | |
def clean_migrations( | |
path: Path, | |
operations: Operations, | |
): | |
migrations_dir = path / 'migrations' | |
is_migrated_app_directory = os.path.isdir(migrations_dir) | |
if is_migrated_app_directory: | |
logger.info( | |
f"🔍 {_ANSI['bold']}" | |
f"Migrations found in {migrations_dir}" | |
f"{_ANSI['reset']}", | |
) | |
app_name = path.name | |
ignored_files = {'__init__.py', '__pycache__'} | |
directory_entries = set(os.listdir(migrations_dir)) | |
migration_files = [ | |
f for f in directory_entries - ignored_files | |
if isfile(migrations_dir / f) and splitext(f)[1] == '.py' | |
] | |
logger.debug(f"Migration files: {migration_files}") | |
for filename in sorted(migration_files): | |
migration_file = MigrationFile( | |
path=migrations_dir / filename, | |
name=filename.split('.py')[0], | |
) | |
if migration_file.name in operations['to_remove'][app_name]: | |
_remove_file(migration_file) | |
continue # File was removed, no need to perform further checks | |
elif migration_file.name in operations['squashed'][app_name]: | |
_remove_replaces_section(migration_file) | |
_search_and_replace_obsolete_migrations( | |
migration_file, | |
operations['dependencies'], | |
) | |
for app_name in listdir(path): | |
if isdir(path / app_name): | |
clean_migrations(path / app_name, operations) | |
def _remove_file(migration_file: MigrationFile) -> None: | |
""" | |
Once a squashed migration has been applied on all environments, we can | |
remove the original migrations, as they are no longer needed. | |
""" | |
os.remove(migration_file.path) | |
logger.info( | |
f"\t\t❌ {_ANSI['red']}" | |
f"Deleting obsolete migration {migration_file.name}" | |
f"{_ANSI['reset']}" | |
) | |
def _remove_replaces_section(migration_file: MigrationFile) -> None: | |
""" | |
Once a squashed migration has been applied on all environments, we can | |
convert it to a regular migration, to enable further future squashing. | |
""" | |
with open(migration_file.path, 'r+') as f: | |
content = f.read() | |
# Matches the content between "replaces = [" and the first "]" | |
replaced_migrations_section = re.sub( | |
r'\t?replaces = \[.*?(?=])]', | |
'', | |
content, | |
flags=re.DOTALL, | |
) | |
logger.debug( | |
f"Removing replaces section from {migration_file.name}\n" | |
f"{replaced_migrations_section}" | |
) | |
_replace_file_contents(f, replaced_migrations_section) | |
logger.info( | |
f"\t\tℹ️ {_ANSI['blue']}" | |
f"Removed 'replaces' from: {migration_file.name}. " | |
f"It's no longer a squashed migration." | |
f"{_ANSI['reset']}", | |
) | |
def _search_and_replace_obsolete_migrations( | |
migration_file: MigrationFile, | |
migration_dependencies, | |
): | |
""" | |
For each migration, checks the "dependencies" section | |
and if a dependency name matches one of the recently removed | |
migration (it was in the "replaces" section of a squashed migration) | |
it will replace the dependency with the squashed migration name. | |
""" | |
with open(migration_file.path, 'r+') as f: | |
content = f.read() | |
has_changed = False | |
dependency_tuples = _parse_dependencies(content) | |
for dep in dependency_tuples: | |
if dep not in migration_dependencies: | |
continue | |
has_changed = True | |
content = content.replace( | |
str(dep), | |
str(migration_dependencies[dep]), | |
) | |
logger.info( | |
f"\t\t♻️ {_ANSI['green']}" | |
f"Updating dependency: {dep} -> " | |
f"{migration_dependencies[dep]} in " | |
f"{migration_file.name}" | |
f"{_ANSI['reset']}", | |
) | |
if not has_changed: | |
logger.debug(f"No changes found in {migration_file.name}") | |
return | |
_replace_file_contents(f, content) | |
def _parse_dependencies(content: str) -> List[Tuple[str, str]]: | |
""" | |
Parses the "dependencies" section of a migration file, then | |
uses the unsafe eval to convert them to Python tuples. | |
Be careful to only run it on migration files you trust to avoid security | |
issues. | |
""" | |
# Regex matches the content between "dependencies = [" and the first "]" | |
dependencies_regex_match = re.findall( | |
r'\t?dependencies = \[(.*?)(?=])]', | |
content, | |
flags=re.DOTALL, | |
) | |
clean_dependency_match = dependencies_regex_match[0].__str__() \ | |
.replace("\\n", "") \ | |
.replace("\n", "") \ | |
.strip() | |
# Removes "migrations.SWAPPABLE_MODEL" | |
# it's safe to be ignored, but it'd raise errors during parsing | |
clean_dependency_match = re.sub( | |
'migrations.*?(,)', | |
'', | |
clean_dependency_match, | |
flags=re.DOTALL, | |
) | |
logger.debug(f"Cleaned dependencies: {clean_dependency_match}") | |
dependency_tuples = eval(f'[{clean_dependency_match}]') | |
return dependency_tuples | |
def _replace_file_contents(file, content: str) -> None: | |
""" | |
Deletes everything from the file and then fills it with the new content. | |
""" | |
logger.debug(f"Replacing file contents: {file.name}\n{content}") | |
file.truncate(0) | |
file.seek(0) | |
file.write(content) | |
def _configure_logging(): | |
custom_formatter = logging.Formatter("%(message)s") | |
logging.getLogger().handlers[0].setFormatter(custom_formatter) | |
logging.getLogger().setLevel(logging.INFO) | |
if __name__ == '__main__': | |
start = time() | |
django.setup() | |
_configure_logging() | |
clean_migrations(Path(_APP_ROOT), operations=collect_operations()) | |
end = time() | |
logger.info( | |
f"\n🎉 {_ANSI['green']}{_ANSI['bold']}" | |
f"Done in {(end - start):.2f} seconds!" | |
f"{_ANSI['reset']}", | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment