Skip to content

Instantly share code, notes, and snippets.

@willstott101
Last active August 29, 2015 14:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save willstott101/d6320ff51e41035aa51c to your computer and use it in GitHub Desktop.
Save willstott101/d6320ff51e41035aa51c to your computer and use it in GitHub Desktop.
Checks prefetched cache of a queryset, and filters/orders in python. To avoid large number of queries.
from operator import attrgetter
def maybe_prefetched(parent, related_manager):
#return related_manager
try:
cache = parent._prefetched_objects_cache
except AttributeError:
# Nothing is prefetched.
return related_manager
if hasattr(related_manager, 'prefetch_cache_name'):
prefetch_cache_name = related_manager.prefetch_cache_name
elif hasattr(related_manager, 'field'):
prefetch_cache_name = related_manager.field.related_query_name()
else:
# For Django Prefetch compatibility
fields = parent.__class__._meta.fields_map.values()
fields = [f for f in fields if f.related_model is related_manager.model]
if len(fields) > 1:
raise KeyError(
'Too many Prefetch managers for model: %r' % related_manager.model)
prefetch_cache_name = fields[0].field.related_query_name()
try:
cache = cache[prefetch_cache_name]
except KeyError:
# This particular field isn't prefetched.
return related_manager
return PythonQuerySet(parent, related_manager, cache)
class BailFakeQuerySet(Exception):
pass
class PythonQuerySet(object):
def __init__(self, parent, related_manager, cache, history=None):
self.parent = parent
self.related_manager = related_manager
self.cache = cache
if history is None:
self.history = tuple() # ('name', args, kwargs)
else:
self.history = history
def _applied_history(self):
qs = self.related_manager.all()
for func, args, kwargs in self.history:
qs = getattr(qs, func)(*args, **kwargs)
return qs
def __getattr__(self, name):
try:
val = super(PythonQuerySet, self).__getattribute__(name)
if callable(val):
def fake_func(*args, **kwargs):
self.history += (name, args, kwargs)
try:
return val(*args, **kwargs)
except BailFakeQuerySet:
return self._applied_history()
return fake_func
else:
return val
except AttributeError:
pass
return getattr(self._applied_history(), name)
def _clone(self, cache=None):
if cache is None:
cache = list(self.cache)
return PythonQuerySet(
self.parent, self.related_manager,
cache, self.history)
# Emulating a QuerySet from here down...
def all(self):
return self._clone()
def __iter__(self):
return iter(self.cache)
def only(self, *args, **kwargs):
return self._clone()
def filter(self, *args, **kwargs):
if args:
raise BailFakeQuerySet
cache = self.cache
for name, val in kwargs.items():
if '__' in name:
# TODO: Support query operators '__gt' etc
raise BailFakeQuerySet
cache = [x for x in cache if getattr(x, name) == val]
return self._clone(cache)
def exclude(self, *args, **kwargs):
if args:
raise BailFakeQuerySet
cache = self.cache
for name, val in kwargs.items():
if '__' in name:
# TODO: Support query operators '__gt' etc
raise BailFakeQuerySet
cache = [x for x in cache if getattr(x, name) != val]
return self._clone(cache)
def get(self, *args, **kwargs):
ret = self.filter(*args, **kwargs)
if ret.count() > 1:
raise self.related_manager.model.MultipleObjectsReturned
elif not ret:
raise self.related_manager.model.DoesNotExist
return ret.first()
def order_by(self, *args, **kwargs):
cache = self.cache
for arg in args[::-1]:
reverse = arg.startswith('-')
if reverse:
arg = arg[1:]
cache = sorted(cache, key=attrgetter(arg), reverse=reverse)
return self._clone(cache)
def values_list(self, *args, **kwargs):
if kwargs.get('flat', False):
if len(args) != 1:
raise BailFakeQuerySet
return [getattr(x, args[0]) for x in self.cache]
else:
return [tuple(getattr(x, a) for a in args) for x in self.cache]
def first(self):
return self.cache[0] if len(self.cache) else None
def last(self):
return self.cache[-1] if len(self.cache) else None
def count(self):
return len(self.cache)
def exists(self):
return bool(self.cache)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment