Skip to content

Instantly share code, notes, and snippets.

@nehajagadeesh
Created March 7, 2018 13:27
Show Gist options
  • Save nehajagadeesh/170302f52a5126c39d1846e828f35c3f to your computer and use it in GitHub Desktop.
Save nehajagadeesh/170302f52a5126c39d1846e828f35c3f to your computer and use it in GitHub Desktop.
Generic Contenttypes
from __future__ import unicode_literals
from collections import defaultdict
from django.core import checks
from django.core.exceptions import FieldDoesNotExist, ObjectDoesNotExist
from django.db import models
from django.utils.encoding import python_2_unicode_compatible
from custom_contenttypes import CustomContentType
@python_2_unicode_compatible
class CustomGenericForeignKey(object):
"""
Provide a generic many-to-one relation through the ``content_type`` and
``object_id`` fields.
This class also doubles as an accessor to the related object (similar to
ForwardManyToOneDescriptor) by adding itself as a model attribute.
"""
# Field flags
auto_created = False
concrete = False
editable = False
hidden = False
is_relation = True
many_to_many = False
many_to_one = True
one_to_many = False
one_to_one = False
related_model = None
remote_field = None
def __init__(self, ct_field='content_type', fk_field='object_id', for_concrete_model=True):
self.ct_field = ct_field
self.fk_field = fk_field
self.for_concrete_model = for_concrete_model
self.editable = False
self.rel = None
self.column = None
def contribute_to_class(self, cls, name, **kwargs):
self.name = name
self.model = cls
self.cache_attr = "_%s_cache" % name
cls._meta.add_field(self, private=True)
setattr(cls, name, self)
def get_filter_kwargs_for_object(self, obj):
"""See corresponding method on Field"""
return {
self.fk_field: getattr(obj, self.fk_field),
self.ct_field: getattr(obj, self.ct_field),
}
def get_forward_related_filter(self, obj):
"""See corresponding method on RelatedField"""
return {
self.fk_field: obj.pk,
self.ct_field: CustomContentType.objects.get_for_model(obj).pk,
}
def __str__(self):
model = self.model
app = model._meta.app_label
return '%s.%s.%s' % (app, model._meta.object_name, self.name)
def check(self, **kwargs):
errors = []
errors.extend(self._check_field_name())
errors.extend(self._check_object_id_field())
errors.extend(self._check_content_type_field())
return errors
def _check_field_name(self):
if self.name.endswith("_"):
return [
checks.Error(
'Field names must not end with an underscore.',
obj=self,
id='fields.E001',
)
]
else:
return []
def _check_object_id_field(self):
try:
self.model._meta.get_field(self.fk_field)
except FieldDoesNotExist:
return [
checks.Error(
"The GenericForeignKey object ID references the non-existent field '%s'." % self.fk_field,
obj=self,
id='contenttypes.E001',
)
]
else:
return []
def _check_content_type_field(self):
"""
Check if field named `field_name` in model `model` exists and is a
valid content_type field (is a ForeignKey to ContentType).
"""
try:
field = self.model._meta.get_field(self.ct_field)
except FieldDoesNotExist:
return [
checks.Error(
"The GenericForeignKey content type references the non-existent field '%s.%s'." % (
self.model._meta.object_name, self.ct_field
),
obj=self,
id='contenttypes.E002',
)
]
else:
if not isinstance(field, models.ForeignKey):
return [
checks.Error(
"'%s.%s' is not a ForeignKey." % (
self.model._meta.object_name, self.ct_field
),
hint=(
"GenericForeignKeys must use a ForeignKey to "
"'custom_contenttypes.CustomContentType' as the 'content_type' field."
),
obj=self,
id='contenttypes.E003',
)
]
elif field.remote_field.model != CustomContentType:
return [
checks.Error(
"'%s.%s' is not a ForeignKey to 'custom_contenttypes.CustomContentType'." % (
self.model._meta.object_name, self.ct_field
),
hint=(
"GenericForeignKeys must use a ForeignKey to "
"'custom_contenttypes.CustomContentType' as the 'content_type' field."
),
obj=self,
id='contenttypes.E004',
)
]
else:
return []
def get_content_type(self, obj=None, id=None, using=None):
if obj is not None:
return CustomContentType.objects.db_manager(obj._state.db).get_for_model(
obj, for_concrete_model=self.for_concrete_model)
elif id is not None:
return CustomContentType.objects.db_manager(using).get_for_id(id)
else:
# This should never happen. I love comments like this, don't you?
raise Exception("Impossible arguments to GFK.get_content_type!")
def get_prefetch_queryset(self, instances, queryset=None):
if queryset is not None:
raise ValueError("Custom queryset can't be used for this lookup.")
# For efficiency, group the instances by content type and then do one
# query per model
fk_dict = defaultdict(set)
# We need one instance for each group in order to get the right db:
instance_dict = {}
ct_attname = self.model._meta.get_field(self.ct_field).get_attname()
for instance in instances:
# We avoid looking for values if either ct_id or fkey value is None
ct_id = getattr(instance, ct_attname)
if ct_id is not None:
fk_val = getattr(instance, self.fk_field)
if fk_val is not None:
fk_dict[ct_id].add(fk_val)
instance_dict[ct_id] = instance
ret_val = []
for ct_id, fkeys in fk_dict.items():
instance = instance_dict[ct_id]
ct = self.get_content_type(id=ct_id, using=instance._state.db)
ret_val.extend(ct.get_all_objects_for_this_type(pk__in=fkeys))
# For doing the join in Python, we have to match both the FK val and the
# content type, so we use a callable that returns a (fk, class) pair.
def gfk_key(obj):
ct_id = getattr(obj, ct_attname)
if ct_id is None:
return None
else:
model = self.get_content_type(id=ct_id,
using=obj._state.db).model_class()
return (model._meta.pk.get_prep_value(getattr(obj, self.fk_field)),
model)
return (ret_val,
lambda obj: (obj._get_pk_val(), obj.__class__),
gfk_key,
True,
self.name)
def is_cached(self, instance):
return hasattr(instance, self.cache_attr)
def __get__(self, instance, cls=None):
if instance is None:
return self
# Don't use getattr(instance, self.ct_field) here because that might
# reload the same ContentType over and over (#5570). Instead, get the
# content type ID here, and later when the actual instance is needed,
# use ContentType.objects.get_for_id(), which has a global cache.
f = self.model._meta.get_field(self.ct_field)
ct_id = getattr(instance, f.get_attname(), None)
pk_val = getattr(instance, self.fk_field)
try:
rel_obj = getattr(instance, self.cache_attr)
except AttributeError:
rel_obj = None
else:
if rel_obj and (ct_id != self.get_content_type(obj=rel_obj, using=instance._state.db).id or
rel_obj._meta.pk.to_python(pk_val) != rel_obj._get_pk_val()):
rel_obj = None
if rel_obj is not None:
return rel_obj
if ct_id is not None:
ct = self.get_content_type(id=ct_id, using=instance._state.db)
try:
rel_obj = ct.get_object_for_this_type(pk=pk_val)
except ObjectDoesNotExist:
pass
setattr(instance, self.cache_attr, rel_obj)
return rel_obj
def __set__(self, instance, value):
ct = None
fk = None
if value is not None:
ct = self.get_content_type(obj=value)
fk = value._get_pk_val()
setattr(instance, self.ct_field, ct)
setattr(instance, self.fk_field, fk)
setattr(instance, self.cache_attr, value)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment