Last active
December 15, 2022 14:18
-
-
Save vdboor/06ad57eaa5a463cd03423f85f1fdc6b8 to your computer and use it in GitHub Desktop.
Running the Django migration engine for custom models
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.core.management import BaseCommand | |
from django.db import connection, models | |
from django.db.migrations import Migration | |
from django.db.migrations.autodetector import MigrationAutodetector | |
from django.db.migrations.graph import MigrationGraph | |
from django.db.migrations.questioner import InteractiveMigrationQuestioner | |
from django.db.migrations.state import ModelState, ProjectState | |
class PatchedModelState(ModelState): | |
"""A workaround to avoid breaking migration rendering.""" | |
def clone(self): | |
"""Return an exact copy of this ModelState.""" | |
# Workaround for Django issue. The fields get bound which breaks reusing them | |
# in migrations. Quick fix is to deep-clone the fields too. | |
self.fields = {name: field.clone() for name, field in self.fields.items()} | |
return super().clone() | |
class Command(BaseCommand): | |
"""Demonstration of using Django migrations.""" | |
requires_system_checks = [] | |
def handle(self, *args, **options): | |
fields1 = { | |
"field1": models.CharField(max_length=200, blank=True), | |
} | |
fields2 = { | |
"field1": models.CharField(max_length=300), | |
"field2": models.IntegerField(default=0), | |
} | |
state1 = self._get_state_raw(fields1) | |
state2 = self._get_state_raw(fields2) | |
detector = MigrationAutodetector( | |
from_state=state1, | |
to_state=state2, | |
questioner=InteractiveMigrationQuestioner(specified_apps=["dummy_demo"]), | |
) | |
dependency_graph = MigrationGraph() | |
changes: dict[str, list[Migration]] = detector.changes( | |
dependency_graph, | |
trim_to_apps=["dummy_demo"], | |
migration_name="dummy", | |
) | |
if not changes: | |
self.stdout.write("No changes detected") | |
return | |
# Print changes | |
start_state = state1 | |
for app_label, app_migrations in changes.items(): | |
for migration in app_migrations: | |
start_state = self._print_sql(start_state, migration) | |
def _print_sql(self, start_state: ProjectState, migration: Migration) -> ProjectState: | |
"""Print the SQL statements for a migration""" | |
self.stdout.write(f"-- Migration: {migration.name}") | |
for operation in migration.operations: | |
self.stdout.write(f"-- {operation}") | |
with connection.schema_editor(collect_sql=True, atomic=migration.atomic) as schema_editor: | |
# TODO/FIXME: currently the start_state gets bound models after an operation. | |
try: | |
start_state = migration.apply(start_state, schema_editor, collect_sql=True) | |
except Exception: | |
# On crashes, still show the generated statements so far | |
self.stdout.write("\n".join(schema_editor.collected_sql)) | |
raise | |
self.stdout.write("\n".join(schema_editor.collected_sql)) | |
return start_state | |
def _get_state(self, fields: dict) -> ProjectState: | |
"""Approach 1 to create a project state (from an existing model)""" | |
ModelClass = type( | |
"MyModel", | |
(models.Model,), | |
{ | |
"__module__": "dummy_demo.models", | |
"Meta": type("Meta", (), {"app_label": "dummy_demo"}), | |
**fields, | |
}, | |
) | |
# Generate the model state. Use exclude_rels=True when M2M-through tables are generated manually. | |
state = ProjectState(real_apps=[]) | |
state.add_model(PatchedModelState.from_model(ModelClass, exclude_rels=False)) | |
return state | |
def _get_state_raw(self, fields: dict) -> ProjectState: | |
"""Approach 2 to create a project state (from raw unbound fields).""" | |
state = ProjectState(real_apps=[]) | |
state.add_model( | |
PatchedModelState( | |
app_label="dummy_demo", | |
name="MyModel", | |
fields=fields, | |
) | |
) | |
return state |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment