|
import os |
|
import warnings |
|
from collections import defaultdict |
|
|
|
from django.core.management import CommandError |
|
from django.core.management.utils import parse_apps_and_model_labels |
|
from django.core.management.commands.loaddata import Command as LoadDataCommand, humanize |
|
from django.core.management.color import no_style |
|
from django.db import ( |
|
DEFAULT_DB_ALIAS, DatabaseError, IntegrityError, connections, router |
|
) |
|
|
|
from django.core import serializers |
|
from django.db import transaction |
|
from django.db.models.fields.related import RelatedField, ManyToManyField |
|
|
|
|
|
class Command(LoadDataCommand): |
|
help = 'Installs the named fixture(s) in the database.' |
|
missing_args_message = ( |
|
"No database fixture specified. Please provide the path of at least " |
|
"one fixture in the command line." |
|
) |
|
|
|
def add_arguments(self, parser): |
|
parser.add_argument('args', metavar='fixture', nargs='+', help='Fixture labels.') |
|
parser.add_argument( |
|
'--database', default=DEFAULT_DB_ALIAS, |
|
help='Nominates a specific database to load fixtures into. Defaults to the "default" database.', |
|
) |
|
parser.add_argument( |
|
'--app', dest='app_label', |
|
help='Only look for fixtures in the specified app.', |
|
) |
|
parser.add_argument( |
|
'-e', '--exclude', action='append', default=[], |
|
help='An app_label or app_label.ModelName to exclude. Can be used multiple times.', |
|
) |
|
parser.add_argument( |
|
'--format', |
|
help='Format of serialized data when reading from stdin.', |
|
) |
|
|
|
# TODO delete equals to overridden |
|
def handle(self, *fixture_labels, **options): |
|
self.using = options['database'] |
|
self.app_label = options['app_label'] |
|
self.verbosity = options['verbosity'] |
|
self.excluded_models, self.excluded_apps = parse_apps_and_model_labels(options['exclude']) |
|
self.format = options['format'] |
|
|
|
with transaction.atomic(using=self.using): |
|
self.appenddata(fixture_labels) |
|
|
|
# Close the DB connection -- unless we're still in a transaction. This |
|
# is required as a workaround for an edge case in MySQL: if the same |
|
# connection is used to create tables, load data, and query, the query |
|
# can return incorrect results. See Django #7572, MySQL #37735. |
|
if transaction.get_autocommit(self.using): |
|
connections[self.using].close() |
|
|
|
def appenddata(self, fixture_labels): |
|
# We could reuse most of the code |
|
self.loaddata(fixture_labels) |
|
|
|
def load_label(self, fixture_label): |
|
"""Load fixtures files for a given label.""" |
|
self.objs_idx = ObjectDict() |
|
self.objects = [] |
|
self.show_progress = self.verbosity >= 3 |
|
self.deferred_m2m = [] |
|
|
|
for fixture_file, fixture_dir, fixture_name in self.find_fixtures(fixture_label): |
|
_, ser_fmt, cmp_fmt = self.parse_name(os.path.basename(fixture_file)) |
|
open_method, mode = self.compression_formats[cmp_fmt] |
|
fixture = open_method(fixture_file, mode) |
|
try: |
|
self.fixture_count += 1 |
|
objects_in_fixture = 0 |
|
loaded_objects_in_fixture = 0 |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
"Installing %s fixture '%s' from %s." |
|
% (ser_fmt, fixture_name, humanize(fixture_dir)) |
|
) |
|
|
|
objects = serializers.deserialize( |
|
ser_fmt, fixture, using=self.using, ignorenonexistent=True, |
|
handle_forward_references=False, |
|
) |
|
|
|
for obj in objects: |
|
objects_in_fixture += 1 |
|
if (obj.object._meta.app_config in self.excluded_apps or |
|
type(obj.object) in self.excluded_models): |
|
continue |
|
if router.allow_migrate_model(self.using, obj.object.__class__): |
|
loaded_objects_in_fixture += 1 |
|
self.models.add(obj.object.__class__) |
|
# Load all fixture in memory |
|
self.objs_idx.append_deserialized_object(obj) |
|
self.objects.append(obj) |
|
if obj.deferred_fields: |
|
self.objs_with_deferred_fields.append(obj) |
|
if objects and self.show_progress: |
|
self.stdout.write('') # add a newline after progress indicator |
|
self.loaded_object_count += loaded_objects_in_fixture |
|
self.fixture_object_count += objects_in_fixture |
|
except Exception as e: |
|
if not isinstance(e, CommandError): |
|
e.args = ("Problem installing fixture '%s': %s" % (fixture_file, e),) |
|
raise e |
|
finally: |
|
fixture.close() |
|
|
|
# Warn if the fixture we loaded contains 0 objects. |
|
if objects_in_fixture == 0: |
|
warnings.warn( |
|
"No fixture data found for '%s'. (File format may be " |
|
"invalid.)" % fixture_name, |
|
RuntimeWarning |
|
) |
|
# Once you have all object in memory you can load them |
|
for obj in self.objects: |
|
self.process_object(obj.object) |
|
if self.verbosity >= 1: |
|
self.stdout.write('... All objects saved ...') |
|
# Once all objects have been save (append mode) and new pks have been assigned add m2m relations |
|
for obj, field_attname, related_pk in self.deferred_m2m: |
|
attr = getattr(obj, field_attname) |
|
attr.add(related_pk) |
|
if self.verbosity >= 3: |
|
self.stdout.write('Adding relation for field {0}: {1} -> {2}'.format(field_attname, obj.pk, related_pk)) |
|
# Disabled for security reason |
|
# raise ValueError('Disabled') |
|
|
|
def process_object(self, obj): |
|
if obj is None: |
|
raise ValueError('None object in process object') |
|
|
|
old_pk = obj.pk |
|
new_pk = self.objs_idx[obj]['new_pk'] |
|
# Object has been save yet no work |
|
if new_pk: |
|
return new_pk |
|
|
|
self.manage_related_field(obj) |
|
|
|
if self.verbosity >= 2: |
|
self.stdout.write('Saving object: (%s, %s)' % (obj.__class__, obj)) |
|
obj.pk = None |
|
try: |
|
obj.save(using=self.using) |
|
if self.show_progress: |
|
self.stdout.write( |
|
'\rSaving object: (%s, %s)' % (obj.__class__, obj), |
|
ending='' |
|
) |
|
# psycopg2 raises ValueError if data contains NUL chars. |
|
except (DatabaseError, IntegrityError, ValueError) as e: |
|
e.args = ("Could not load %(app_label)s.%(object_name)s(pk=%(pk)s): %(error_msg)s" % { |
|
'app_label': obj.object._meta.app_label, |
|
'object_name': obj.object._meta.object_name, |
|
'pk': obj.object.pk, |
|
'error_msg': e, |
|
},) |
|
raise |
|
self.objs_idx.data[obj._meta.model][old_pk]['new_pk'] = obj.pk |
|
return obj.pk |
|
|
|
def manage_related_field(self, obj): |
|
related_fields = [field for field in obj._meta.get_fields() if isinstance(field, RelatedField)] |
|
if len(related_fields) > 0: # has not related field |
|
for field in related_fields: |
|
if field.related_model in self.excluded_models: |
|
continue |
|
|
|
if type(field) is ManyToManyField: |
|
attr = getattr(obj, field.attname) |
|
attr.clear() |
|
m2m_pks = self.objs_idx[obj]['deserialized_object'].m2m_data[field.name] |
|
for m2m_pk in m2m_pks: |
|
related_obj = self.objs_idx.data[field.related_model][m2m_pk]['object'] |
|
new_related_pk = self.process_object(related_obj) |
|
self.deferred_m2m.append((obj, field.attname, new_related_pk)) |
|
# attr.add(new_related_pk) |
|
else: |
|
related_obj = self.objs_idx.data[field.related_model][getattr(obj, field.attname)]['object'] |
|
if related_obj is not None: |
|
new_related_pk = self.process_object(related_obj) |
|
setattr(obj, field.attname, new_related_pk) |
|
|
|
|
|
class ObjectDict(object): |
|
""" |
|
Dictionary to easily retrieve fixture object based on class and their original primary key |
|
""" |
|
|
|
def __init__(self): |
|
self.data = defaultdict(lambda: defaultdict(lambda: {'new_pk': None, 'object': None})) |
|
|
|
@staticmethod |
|
def from_deserialized_objects(deserialized_objects): |
|
instance = ObjectDict() |
|
for deserialized_object in deserialized_objects: |
|
instance.append_deserialized_object(deserialized_object) |
|
return instance |
|
|
|
def __getitem__(self, item): |
|
return self.data[item._meta.model][item.old_pk] |
|
|
|
def append_deserialized_object(self, deserialized_object): |
|
obj = deserialized_object.object |
|
setattr(obj, 'old_pk', obj.pk) |
|
self[obj]['object'] = obj |
|
self[obj]['deserialized_object'] = deserialized_object |