Skip to content

Instantly share code, notes, and snippets.

@MattFanto
Last active November 16, 2023 16:51
Show Gist options
  • Save MattFanto/f6c0ee0bc392da1d0d90f28efdb77e40 to your computer and use it in GitHub Desktop.
Save MattFanto/f6c0ee0bc392da1d0d90f28efdb77e40 to your computer and use it in GitHub Desktop.
Django appenddata

Alternative command to django loaddata when it is necessary to append fixture objects into an existing database. Instead of merging fixture data with your existing models (as it does loaddata) it appends all fixtures object by resetting all pk. M2M relations are managed at the end of the process, mapping the old primary keys with the new primary keys:

Example of test (appending data from Website2 into Website1):

# Website 1
python manage.py dumpdata app1 app2 ... > test_append_data_fixtures_pre.json

# Website 1
python manage.py dumpdata app1 app2 ... > fixture_to_import.json

# Website 1, run the provided test
python manage.py test TestCustomCommands.test_appenddata

Example of usage (appending data from Website2 into Website1):

# Website 1
python manage.py appenddata fixture_to_import.json
import os
import warnings
from collections import defaultdict
from django.core.management import CommandError
from django.core.management.utils import parse_apps_and_model_labels
from django.core.management.commands.loaddata import Command as LoadDataCommand, humanize
from django.core.management.color import no_style
from django.db import (
DEFAULT_DB_ALIAS, DatabaseError, IntegrityError, connections, router
)
from django.core import serializers
from django.db import transaction
from django.db.models.fields.related import RelatedField, ManyToManyField
class Command(LoadDataCommand):
help = 'Installs the named fixture(s) in the database.'
missing_args_message = (
"No database fixture specified. Please provide the path of at least "
"one fixture in the command line."
)
def add_arguments(self, parser):
parser.add_argument('args', metavar='fixture', nargs='+', help='Fixture labels.')
parser.add_argument(
'--database', default=DEFAULT_DB_ALIAS,
help='Nominates a specific database to load fixtures into. Defaults to the "default" database.',
)
parser.add_argument(
'--app', dest='app_label',
help='Only look for fixtures in the specified app.',
)
parser.add_argument(
'-e', '--exclude', action='append', default=[],
help='An app_label or app_label.ModelName to exclude. Can be used multiple times.',
)
parser.add_argument(
'--format',
help='Format of serialized data when reading from stdin.',
)
# TODO delete equals to overridden
def handle(self, *fixture_labels, **options):
self.using = options['database']
self.app_label = options['app_label']
self.verbosity = options['verbosity']
self.excluded_models, self.excluded_apps = parse_apps_and_model_labels(options['exclude'])
self.format = options['format']
with transaction.atomic(using=self.using):
self.appenddata(fixture_labels)
# Close the DB connection -- unless we're still in a transaction. This
# is required as a workaround for an edge case in MySQL: if the same
# connection is used to create tables, load data, and query, the query
# can return incorrect results. See Django #7572, MySQL #37735.
if transaction.get_autocommit(self.using):
connections[self.using].close()
def appenddata(self, fixture_labels):
# We could reuse most of the code
self.loaddata(fixture_labels)
def load_label(self, fixture_label):
"""Load fixtures files for a given label."""
self.objs_idx = ObjectDict()
self.objects = []
self.show_progress = self.verbosity >= 3
self.deferred_m2m = []
for fixture_file, fixture_dir, fixture_name in self.find_fixtures(fixture_label):
_, ser_fmt, cmp_fmt = self.parse_name(os.path.basename(fixture_file))
open_method, mode = self.compression_formats[cmp_fmt]
fixture = open_method(fixture_file, mode)
try:
self.fixture_count += 1
objects_in_fixture = 0
loaded_objects_in_fixture = 0
if self.verbosity >= 2:
self.stdout.write(
"Installing %s fixture '%s' from %s."
% (ser_fmt, fixture_name, humanize(fixture_dir))
)
objects = serializers.deserialize(
ser_fmt, fixture, using=self.using, ignorenonexistent=True,
handle_forward_references=False,
)
for obj in objects:
objects_in_fixture += 1
if (obj.object._meta.app_config in self.excluded_apps or
type(obj.object) in self.excluded_models):
continue
if router.allow_migrate_model(self.using, obj.object.__class__):
loaded_objects_in_fixture += 1
self.models.add(obj.object.__class__)
# Load all fixture in memory
self.objs_idx.append_deserialized_object(obj)
self.objects.append(obj)
if obj.deferred_fields:
self.objs_with_deferred_fields.append(obj)
if objects and self.show_progress:
self.stdout.write('') # add a newline after progress indicator
self.loaded_object_count += loaded_objects_in_fixture
self.fixture_object_count += objects_in_fixture
except Exception as e:
if not isinstance(e, CommandError):
e.args = ("Problem installing fixture '%s': %s" % (fixture_file, e),)
raise e
finally:
fixture.close()
# Warn if the fixture we loaded contains 0 objects.
if objects_in_fixture == 0:
warnings.warn(
"No fixture data found for '%s'. (File format may be "
"invalid.)" % fixture_name,
RuntimeWarning
)
# Once you have all object in memory you can load them
for obj in self.objects:
self.process_object(obj.object)
if self.verbosity >= 1:
self.stdout.write('... All objects saved ...')
# Once all objects have been save (append mode) and new pks have been assigned add m2m relations
for obj, field_attname, related_pk in self.deferred_m2m:
attr = getattr(obj, field_attname)
attr.add(related_pk)
if self.verbosity >= 3:
self.stdout.write('Adding relation for field {0}: {1} -> {2}'.format(field_attname, obj.pk, related_pk))
# Disabled for security reason
# raise ValueError('Disabled')
def process_object(self, obj):
if obj is None:
raise ValueError('None object in process object')
old_pk = obj.pk
new_pk = self.objs_idx[obj]['new_pk']
# Object has been save yet no work
if new_pk:
return new_pk
self.manage_related_field(obj)
if self.verbosity >= 2:
self.stdout.write('Saving object: (%s, %s)' % (obj.__class__, obj))
obj.pk = None
try:
obj.save(using=self.using)
if self.show_progress:
self.stdout.write(
'\rSaving object: (%s, %s)' % (obj.__class__, obj),
ending=''
)
# psycopg2 raises ValueError if data contains NUL chars.
except (DatabaseError, IntegrityError, ValueError) as e:
e.args = ("Could not load %(app_label)s.%(object_name)s(pk=%(pk)s): %(error_msg)s" % {
'app_label': obj.object._meta.app_label,
'object_name': obj.object._meta.object_name,
'pk': obj.object.pk,
'error_msg': e,
},)
raise
self.objs_idx.data[obj._meta.model][old_pk]['new_pk'] = obj.pk
return obj.pk
def manage_related_field(self, obj):
related_fields = [field for field in obj._meta.get_fields() if isinstance(field, RelatedField)]
if len(related_fields) > 0: # has not related field
for field in related_fields:
if field.related_model in self.excluded_models:
continue
if type(field) is ManyToManyField:
attr = getattr(obj, field.attname)
attr.clear()
m2m_pks = self.objs_idx[obj]['deserialized_object'].m2m_data[field.name]
for m2m_pk in m2m_pks:
related_obj = self.objs_idx.data[field.related_model][m2m_pk]['object']
new_related_pk = self.process_object(related_obj)
self.deferred_m2m.append((obj, field.attname, new_related_pk))
# attr.add(new_related_pk)
else:
related_obj = self.objs_idx.data[field.related_model][getattr(obj, field.attname)]['object']
if related_obj is not None:
new_related_pk = self.process_object(related_obj)
setattr(obj, field.attname, new_related_pk)
class ObjectDict(object):
"""
Dictionary to easily retrieve fixture object based on class and their original primary key
"""
def __init__(self):
self.data = defaultdict(lambda: defaultdict(lambda: {'new_pk': None, 'object': None}))
@staticmethod
def from_deserialized_objects(deserialized_objects):
instance = ObjectDict()
for deserialized_object in deserialized_objects:
instance.append_deserialized_object(deserialized_object)
return instance
def __getitem__(self, item):
return self.data[item._meta.model][item.old_pk]
def append_deserialized_object(self, deserialized_object):
obj = deserialized_object.object
setattr(obj, 'old_pk', obj.pk)
self[obj]['object'] = obj
self[obj]['deserialized_object'] = deserialized_object
from collections import defaultdict
from django.core import serializers
from django.core.management import call_command
from django.test import TestCase
from django.apps import apps
class TestCustomCommands(TestCase):
fixtures = ['tmp/dump/test_append_data_fixtures_pre.json']
def test_appenddata(self):
fixture_to_import = 'tmp/dump/or_pmi_data.json'
exclude = 'sites.Site'
# Counts objects before appenddata per model
count_pre = {}
for model in apps.get_models():
count_pre[model] = model.objects.count()
# Counts objects to append per model
with open(fixture_to_import, 'r') as f:
objects = serializers.deserialize('json', f, ignorenonexistent=True)
count_new = defaultdict(lambda: 0)
for obj in objects:
count_new[obj.object._meta.model] += 1
call_command('appenddata', fixture_to_import, '-e', exclude, '-v', '0')
# Verify with count that all objects have been imported
for model in apps.get_models():
self.assertEqual(count_pre[model] + count_new[model], model.objects.count(), msg='Count mismatch for model %s' % model)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment