Skip to content

Instantly share code, notes, and snippets.

@primoz-k
Forked from nick-merrill/merge_model_objects.py
Last active March 13, 2016 22:17
Show Gist options
  • Save primoz-k/35e77a0b20f2f55c61f9 to your computer and use it in GitHub Desktop.
Save primoz-k/35e77a0b20f2f55c61f9 to your computer and use it in GitHub Desktop.
A variation on a snippet that handles one-to-one relationships by recursively migrating those relationships' field data to the `primary_object`'s related object.
# -*- coding: utf-8 -*-
from django.db.models import Model
from django.contrib.contenttypes.fields import GenericForeignKey
from django.db.utils import IntegrityError
from django.db import transaction
from urllib import parse
try:
from django.apps import apps
get_models = apps.get_models
except ImportError:
from django.db.models.loading import get_models
def merge_model_objects(primary_object, alias_objects=None, keep_old=False):
"""
Use this function to merge model objects (i.e. Users, Organizations, Polls,
etc.) and migrate all of the related fields from the alias objects to the
primary object.
Usage:
from django.contrib.auth.models import User
primary_user = User.objects.get(email='good_email@example.com')
duplicate_user = User.objects.get(email='good_email+duplicate@example.com')
merge_model_objects(primary_user, duplicate_user)
Shamelessly stolen and then modified from https://djangosnippets.org/snippets/2283/
"""
if not alias_objects:
alias_objects = []
if not isinstance(alias_objects, list):
alias_objects = [alias_objects]
# check that all aliases are the same class as primary one and that
# they are subclass of model
primary_class = primary_object.__class__
if not issubclass(primary_class, Model):
raise TypeError('Only django.db.models.Model subclasses can be merged')
for alias_object in alias_objects:
if not isinstance(alias_object, primary_class):
raise TypeError('Only models of same class can be merged')
# Get a list of all GenericForeignKeys in all models
# TODO: this is a bit of a hack, since the generics framework should provide a similar
# method to the ForeignKey field for accessing the generic related fields.
generic_fields = []
for model in get_models():
for field_name, field in [x for x in iter(model.__dict__.items()) if isinstance(x[1], GenericForeignKey)]:
generic_fields.append(field)
blank_local_fields = set([field.attname for field in primary_object._meta.local_fields if
getattr(primary_object, field.attname) in [None, '']])
# Loop through all alias objects and migrate their data to the primary object.
for alias_object in alias_objects:
# Migrate all foreign key references from alias object to primary object.
for related_object in alias_object._meta.get_all_related_objects():
# The variable name on the alias_object model.
alias_varname = related_object.get_accessor_name()
# The variable name on the related model.
obj_varname = related_object.field.name
related_objects = getattr(alias_object, alias_varname)
for obj in related_objects.all():
setattr(obj, obj_varname, primary_object)
obj.save()
# Migrate all many to many references from alias object to primary object.
related_many_to_many_objects = [f for f in alias_object._meta.get_fields(include_hidden=True) if f.many_to_many]
for related_many_object in related_many_to_many_objects:
alias_fieldname = related_many_object.name
# Transfer instances from alias to primary
alias_related_objects = getattr(alias_object, alias_fieldname).all()
if alias_related_objects:
getattr(primary_object, alias_fieldname).add(*alias_related_objects)
getattr(alias_object, alias_fieldname).remove(*alias_related_objects)
# Migrate all generic foreign key references from alias object to primary object.
for field in generic_fields:
filter_kwargs = {
field.fk_field: alias_object._get_pk_val(),
field.ct_field: field.get_content_type(alias_object)
}
for generic_related_object in field.model.objects.filter(**filter_kwargs):
try:
with transaction.atomic():
setattr(generic_related_object, field.name, primary_object)
generic_related_object.save()
except IntegrityError as e:
# Will not save generic related object, because it already exists on primary object
pass
# Try to fill all missing values in primary object by values of duplicates
filled_up = set()
for field_name in blank_local_fields:
val = getattr(alias_object, field_name)
if val not in [None, '', []]:
setattr(primary_object, field_name, val)
filled_up.add(field_name)
blank_local_fields -= filled_up
if not keep_old:
alias_object.delete()
primary_object.save()
return primary_object
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment