Skip to content

Instantly share code, notes, and snippets.

@charettes
Created April 18, 2022 14:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save charettes/3dcdec3bf66257b0299455a70559f47d to your computer and use it in GitHub Desktop.
Save charettes/3dcdec3bf66257b0299455a70559f47d to your computer and use it in GitHub Desktop.
Generic foreign key that allows for a particular related manager to be used during prefetch
class PrefetchedManagerGenericForeignKey(GenericForeignKey):
def __init__(self, manager_name, *args, **kwargs):
self.manager_name = manager_name
super(PrefetchedManagerGenericForeignKey, self).__init__(*args, **kwargs)
def get_content_type_manager(self, content_type):
model_class = content_type.model_class()
return getattr(model_class, self.manager_name, model_class._base_manager)
# This is mostly a copy-paste of the GenericForeignKey implementation
# with a call to `get_content_type_manager` to attempt to retrieve a
# manager with the specified name and fallback to `_base_manager` which
# is what the default implementation does.
def get_prefetch_queryset(self, instances, queryset=None):
if queryset is not None:
raise ValueError("Custom queryset can't be used for this lookup.")
# For efficiency, group the instances by content type and then do one
# query per model
fk_dict = defaultdict(set)
# We need one instance for each group in order to get the right db:
instance_dict = {}
ct_attname = self.model._meta.get_field(self.ct_field).get_attname()
for instance in instances:
# We avoid looking for values if either ct_id or fkey value is None
ct_id = getattr(instance, ct_attname)
if ct_id is not None:
fk_val = getattr(instance, self.fk_field)
if fk_val is not None:
fk_dict[ct_id].add(fk_val)
instance_dict[ct_id] = instance
ret_val = []
for ct_id, fkeys in fk_dict.items():
instance = instance_dict[ct_id]
ct = self.get_content_type(id=ct_id, using=instance._state.db)
# XXX: This is the only part replaced in GFK implementation.
manager = self.get_content_type_manager(ct)
ret_val.extend(manager.filter(pk__in=fkeys))
# For doing the join in Python, we have to match both the FK val and the
# content type, so we use a callable that returns a (fk, class) pair.
def gfk_key(obj):
ct_id = getattr(obj, ct_attname)
if ct_id is None:
return None
else:
model = self.get_content_type(id=ct_id,
using=obj._state.db).model_class()
return (model._meta.pk.get_prep_value(getattr(obj, self.fk_field)),
model)
return (ret_val,
lambda obj: (obj._get_pk_val(), obj.__class__),
gfk_key,
True,
self.cache_attr)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment