Created
March 7, 2018 13:27
-
-
Save nehajagadeesh/170302f52a5126c39d1846e828f35c3f to your computer and use it in GitHub Desktop.
Generic Contenttypes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import unicode_literals | |
from collections import defaultdict | |
from django.core import checks | |
from django.core.exceptions import FieldDoesNotExist, ObjectDoesNotExist | |
from django.db import models | |
from django.utils.encoding import python_2_unicode_compatible | |
from custom_contenttypes import CustomContentType | |
@python_2_unicode_compatible | |
class CustomGenericForeignKey(object): | |
""" | |
Provide a generic many-to-one relation through the ``content_type`` and | |
``object_id`` fields. | |
This class also doubles as an accessor to the related object (similar to | |
ForwardManyToOneDescriptor) by adding itself as a model attribute. | |
""" | |
# Field flags | |
auto_created = False | |
concrete = False | |
editable = False | |
hidden = False | |
is_relation = True | |
many_to_many = False | |
many_to_one = True | |
one_to_many = False | |
one_to_one = False | |
related_model = None | |
remote_field = None | |
def __init__(self, ct_field='content_type', fk_field='object_id', for_concrete_model=True): | |
self.ct_field = ct_field | |
self.fk_field = fk_field | |
self.for_concrete_model = for_concrete_model | |
self.editable = False | |
self.rel = None | |
self.column = None | |
def contribute_to_class(self, cls, name, **kwargs): | |
self.name = name | |
self.model = cls | |
self.cache_attr = "_%s_cache" % name | |
cls._meta.add_field(self, private=True) | |
setattr(cls, name, self) | |
def get_filter_kwargs_for_object(self, obj): | |
"""See corresponding method on Field""" | |
return { | |
self.fk_field: getattr(obj, self.fk_field), | |
self.ct_field: getattr(obj, self.ct_field), | |
} | |
def get_forward_related_filter(self, obj): | |
"""See corresponding method on RelatedField""" | |
return { | |
self.fk_field: obj.pk, | |
self.ct_field: CustomContentType.objects.get_for_model(obj).pk, | |
} | |
def __str__(self): | |
model = self.model | |
app = model._meta.app_label | |
return '%s.%s.%s' % (app, model._meta.object_name, self.name) | |
def check(self, **kwargs): | |
errors = [] | |
errors.extend(self._check_field_name()) | |
errors.extend(self._check_object_id_field()) | |
errors.extend(self._check_content_type_field()) | |
return errors | |
def _check_field_name(self): | |
if self.name.endswith("_"): | |
return [ | |
checks.Error( | |
'Field names must not end with an underscore.', | |
obj=self, | |
id='fields.E001', | |
) | |
] | |
else: | |
return [] | |
def _check_object_id_field(self): | |
try: | |
self.model._meta.get_field(self.fk_field) | |
except FieldDoesNotExist: | |
return [ | |
checks.Error( | |
"The GenericForeignKey object ID references the non-existent field '%s'." % self.fk_field, | |
obj=self, | |
id='contenttypes.E001', | |
) | |
] | |
else: | |
return [] | |
def _check_content_type_field(self): | |
""" | |
Check if field named `field_name` in model `model` exists and is a | |
valid content_type field (is a ForeignKey to ContentType). | |
""" | |
try: | |
field = self.model._meta.get_field(self.ct_field) | |
except FieldDoesNotExist: | |
return [ | |
checks.Error( | |
"The GenericForeignKey content type references the non-existent field '%s.%s'." % ( | |
self.model._meta.object_name, self.ct_field | |
), | |
obj=self, | |
id='contenttypes.E002', | |
) | |
] | |
else: | |
if not isinstance(field, models.ForeignKey): | |
return [ | |
checks.Error( | |
"'%s.%s' is not a ForeignKey." % ( | |
self.model._meta.object_name, self.ct_field | |
), | |
hint=( | |
"GenericForeignKeys must use a ForeignKey to " | |
"'custom_contenttypes.CustomContentType' as the 'content_type' field." | |
), | |
obj=self, | |
id='contenttypes.E003', | |
) | |
] | |
elif field.remote_field.model != CustomContentType: | |
return [ | |
checks.Error( | |
"'%s.%s' is not a ForeignKey to 'custom_contenttypes.CustomContentType'." % ( | |
self.model._meta.object_name, self.ct_field | |
), | |
hint=( | |
"GenericForeignKeys must use a ForeignKey to " | |
"'custom_contenttypes.CustomContentType' as the 'content_type' field." | |
), | |
obj=self, | |
id='contenttypes.E004', | |
) | |
] | |
else: | |
return [] | |
def get_content_type(self, obj=None, id=None, using=None): | |
if obj is not None: | |
return CustomContentType.objects.db_manager(obj._state.db).get_for_model( | |
obj, for_concrete_model=self.for_concrete_model) | |
elif id is not None: | |
return CustomContentType.objects.db_manager(using).get_for_id(id) | |
else: | |
# This should never happen. I love comments like this, don't you? | |
raise Exception("Impossible arguments to GFK.get_content_type!") | |
def get_prefetch_queryset(self, instances, queryset=None): | |
if queryset is not None: | |
raise ValueError("Custom queryset can't be used for this lookup.") | |
# For efficiency, group the instances by content type and then do one | |
# query per model | |
fk_dict = defaultdict(set) | |
# We need one instance for each group in order to get the right db: | |
instance_dict = {} | |
ct_attname = self.model._meta.get_field(self.ct_field).get_attname() | |
for instance in instances: | |
# We avoid looking for values if either ct_id or fkey value is None | |
ct_id = getattr(instance, ct_attname) | |
if ct_id is not None: | |
fk_val = getattr(instance, self.fk_field) | |
if fk_val is not None: | |
fk_dict[ct_id].add(fk_val) | |
instance_dict[ct_id] = instance | |
ret_val = [] | |
for ct_id, fkeys in fk_dict.items(): | |
instance = instance_dict[ct_id] | |
ct = self.get_content_type(id=ct_id, using=instance._state.db) | |
ret_val.extend(ct.get_all_objects_for_this_type(pk__in=fkeys)) | |
# For doing the join in Python, we have to match both the FK val and the | |
# content type, so we use a callable that returns a (fk, class) pair. | |
def gfk_key(obj): | |
ct_id = getattr(obj, ct_attname) | |
if ct_id is None: | |
return None | |
else: | |
model = self.get_content_type(id=ct_id, | |
using=obj._state.db).model_class() | |
return (model._meta.pk.get_prep_value(getattr(obj, self.fk_field)), | |
model) | |
return (ret_val, | |
lambda obj: (obj._get_pk_val(), obj.__class__), | |
gfk_key, | |
True, | |
self.name) | |
def is_cached(self, instance): | |
return hasattr(instance, self.cache_attr) | |
def __get__(self, instance, cls=None): | |
if instance is None: | |
return self | |
# Don't use getattr(instance, self.ct_field) here because that might | |
# reload the same ContentType over and over (#5570). Instead, get the | |
# content type ID here, and later when the actual instance is needed, | |
# use ContentType.objects.get_for_id(), which has a global cache. | |
f = self.model._meta.get_field(self.ct_field) | |
ct_id = getattr(instance, f.get_attname(), None) | |
pk_val = getattr(instance, self.fk_field) | |
try: | |
rel_obj = getattr(instance, self.cache_attr) | |
except AttributeError: | |
rel_obj = None | |
else: | |
if rel_obj and (ct_id != self.get_content_type(obj=rel_obj, using=instance._state.db).id or | |
rel_obj._meta.pk.to_python(pk_val) != rel_obj._get_pk_val()): | |
rel_obj = None | |
if rel_obj is not None: | |
return rel_obj | |
if ct_id is not None: | |
ct = self.get_content_type(id=ct_id, using=instance._state.db) | |
try: | |
rel_obj = ct.get_object_for_this_type(pk=pk_val) | |
except ObjectDoesNotExist: | |
pass | |
setattr(instance, self.cache_attr, rel_obj) | |
return rel_obj | |
def __set__(self, instance, value): | |
ct = None | |
fk = None | |
if value is not None: | |
ct = self.get_content_type(obj=value) | |
fk = value._get_pk_val() | |
setattr(instance, self.ct_field, ct) | |
setattr(instance, self.fk_field, fk) | |
setattr(instance, self.cache_attr, value) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment