-
-
Save primoz-k/35e77a0b20f2f55c61f9 to your computer and use it in GitHub Desktop.
A variation on a snippet that handles one-to-one relationships by recursively migrating those relationships' field data to the `primary_object`'s related object.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from django.db.models import Model | |
from django.contrib.contenttypes.fields import GenericForeignKey | |
from django.db.utils import IntegrityError | |
from django.db import transaction | |
from urllib import parse | |
try: | |
from django.apps import apps | |
get_models = apps.get_models | |
except ImportError: | |
from django.db.models.loading import get_models | |
def merge_model_objects(primary_object, alias_objects=None, keep_old=False): | |
""" | |
Use this function to merge model objects (i.e. Users, Organizations, Polls, | |
etc.) and migrate all of the related fields from the alias objects to the | |
primary object. | |
Usage: | |
from django.contrib.auth.models import User | |
primary_user = User.objects.get(email='good_email@example.com') | |
duplicate_user = User.objects.get(email='good_email+duplicate@example.com') | |
merge_model_objects(primary_user, duplicate_user) | |
Shamelessly stolen and then modified from https://djangosnippets.org/snippets/2283/ | |
""" | |
if not alias_objects: | |
alias_objects = [] | |
if not isinstance(alias_objects, list): | |
alias_objects = [alias_objects] | |
# check that all aliases are the same class as primary one and that | |
# they are subclass of model | |
primary_class = primary_object.__class__ | |
if not issubclass(primary_class, Model): | |
raise TypeError('Only django.db.models.Model subclasses can be merged') | |
for alias_object in alias_objects: | |
if not isinstance(alias_object, primary_class): | |
raise TypeError('Only models of same class can be merged') | |
# Get a list of all GenericForeignKeys in all models | |
# TODO: this is a bit of a hack, since the generics framework should provide a similar | |
# method to the ForeignKey field for accessing the generic related fields. | |
generic_fields = [] | |
for model in get_models(): | |
for field_name, field in [x for x in iter(model.__dict__.items()) if isinstance(x[1], GenericForeignKey)]: | |
generic_fields.append(field) | |
blank_local_fields = set([field.attname for field in primary_object._meta.local_fields if | |
getattr(primary_object, field.attname) in [None, '']]) | |
# Loop through all alias objects and migrate their data to the primary object. | |
for alias_object in alias_objects: | |
# Migrate all foreign key references from alias object to primary object. | |
for related_object in alias_object._meta.get_all_related_objects(): | |
# The variable name on the alias_object model. | |
alias_varname = related_object.get_accessor_name() | |
# The variable name on the related model. | |
obj_varname = related_object.field.name | |
related_objects = getattr(alias_object, alias_varname) | |
for obj in related_objects.all(): | |
setattr(obj, obj_varname, primary_object) | |
obj.save() | |
# Migrate all many to many references from alias object to primary object. | |
related_many_to_many_objects = [f for f in alias_object._meta.get_fields(include_hidden=True) if f.many_to_many] | |
for related_many_object in related_many_to_many_objects: | |
alias_fieldname = related_many_object.name | |
# Transfer instances from alias to primary | |
alias_related_objects = getattr(alias_object, alias_fieldname).all() | |
if alias_related_objects: | |
getattr(primary_object, alias_fieldname).add(*alias_related_objects) | |
getattr(alias_object, alias_fieldname).remove(*alias_related_objects) | |
# Migrate all generic foreign key references from alias object to primary object. | |
for field in generic_fields: | |
filter_kwargs = { | |
field.fk_field: alias_object._get_pk_val(), | |
field.ct_field: field.get_content_type(alias_object) | |
} | |
for generic_related_object in field.model.objects.filter(**filter_kwargs): | |
try: | |
with transaction.atomic(): | |
setattr(generic_related_object, field.name, primary_object) | |
generic_related_object.save() | |
except IntegrityError as e: | |
# Will not save generic related object, because it already exists on primary object | |
pass | |
# Try to fill all missing values in primary object by values of duplicates | |
filled_up = set() | |
for field_name in blank_local_fields: | |
val = getattr(alias_object, field_name) | |
if val not in [None, '', []]: | |
setattr(primary_object, field_name, val) | |
filled_up.add(field_name) | |
blank_local_fields -= filled_up | |
if not keep_old: | |
alias_object.delete() | |
primary_object.save() | |
return primary_object |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment