Skip to content

Instantly share code, notes, and snippets.

@mikalv
Forked from evert0n/fields.py
Created April 9, 2024 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mikalv/fb995ff6b01ba2148ebcb672e7edc02d to your computer and use it in GitHub Desktop.
Save mikalv/fb995ff6b01ba2148ebcb672e7edc02d to your computer and use it in GitHub Desktop.
Virtual Field for Django
from django.db.models import Field
from django.conf import settings
from django.utils.decorators import cached_property
class VirtualField(object):
"""
A virtual field, mainly used for caching and seamless computed field retrieval.
This acts both like a (cached) property and a virtual field if supported.
"""
auto_created = False
concrete = False
editable = False
hidden = False
is_relation = False
many_to_many = False
many_to_one = False
one_to_many = False
one_to_one = False
related_model = None
def __init__(self, fget=None, fset=None, cached=False, reflect=None, target=None, target_ignore_none=False, act_as_field=True, cache_when_set=True, verbose_name=None, *args, **kwargs):
"""Initializer for VirtualField.
All arguments are optional.
Keyword arguments:
fget -- Getter function for this field (instance arg is given)
fset -- Setter function for this field (instance and value args are given)
cached -- Specifies whether or not to cache the result of this field once invoked. (default is False)
reflect -- An iterable of key or tuples containing keys pointing to the specified (possibly nested) object attribute. Each key may be a lambda or function, in which case they are called once invoked.
target -- Same as reflect, except that they are not deleted once collected by __get__ and also acts as another cache target when the field's result is being cached.
target_ignore_none -- Specifies whether or not to ignore a target field which value is None. If this is True, fields with value as None are ignored and considered as being not "set".
verbose_name -- A human-readable name for the field.
Example:
@VirtualField(cache=True, reflect=('foo', ('bar', 'baz')), target=('quaz', ('qux', lambda: 'quux' + 'corge'))
def my_field(self): ...
This field, when invoked, will first try to fetch the caches in the following order:
* object.foo
* object.bar.baz
* object.quaz
* object.qux.quuxcorge
(The object is the instance of the owning model/class)
If one of the above attributes are available, the first available one is used as the result. Otherwise, the inner fget method is called to fetch it.
Since the cache attribute is set to True, the result is stored into instance (in the same manner as cached_property) and the fields specified in the `target' attribute.
"""
self.fget, self.fset, self.cached = fget, fset, cached
if fget is not None and hasattr(fget, '__name__'):
self.name = fget.__name__
else:
self.name = None
self.act_as_field = act_as_field
self.cache_when_set = cache_when_set
self.target_ignore_none = target_ignore_none
self.cache_when_set = True
doc = getattr(fget, '__doc__', None)
if doc is not None:
self.__doc__ = doc
self._vf_reflects_old = reflect
self._vf_targets_old = reflect_readonly
super(VirtualField, self).__init__(*args, **kwargs)
self.verbose_name = verbose_name
if not act_as_field:
self.short_description = verbose_name
def _vf_normalize(self, name):
"""The function that does normalization of reflect and/or target argument.
For each item in the reflect/target list,
1. If the item is callable, call it to obtain the actual item (it must return either string or tuple)
2. If the item is None, ignore this item.
3. If the item is a string, take it as a key. make this field reference directly on it.
4. If the item is iterable, make this field follow the keys when fetched.
"""
# This must be called AFTER model is initialized!
# TODO Find out a better way to check if the model is initialized, not just checking whether self.model is set.
assert hasattr(self, 'model')
_str_types = six.string_types
for keys in getattr(self, name):
if callable(keys):
keys = keys()
if keys is None:
continue
elif isinstance(keys, _str_types):
keys = (keys,)
else:
keys = tuple(x() if callable(x) else x for x in keys)
if any(x is None for x in keys):
continue
yield keys
_vf_reflects = cached_property(lambda self: list(self._vf_normalize('_vf_reflects_old')))
_vf_targets = cached_property(lambda self: list(self._vf_normalize('_vf_targets_old')))
def _vf_enumerate(self):
for x in self._vf_reflects:
yield True, x
for x in self._vf_targets:
yield False, x
def getter(self, fget):
"""Specifies the getter of this field.
The usage is similliar to that of Python decorator `property'.
Usage:
@VirtualField(cached=True)
def field(instance):
return sum(x for x in instance.foos)
Or, alternatively:
field = VirtualField(cached=True)
@field.getter()
def field(instance):
return sum(x for x in instance.foos)
"""
self.fget = fget
if self.name is None and fget is not None and hasattr(fget, '__name__'):
self.name = fget.__name__
doc = getattr(fget, '__doc__', None)
if doc is not None:
self.__doc__ = doc
return self
def setter(self, fset, still_cache=True):
"""Specifies the setter of this field.
The usage is similliar to that of Python decorator `property'.
Usage:
@VirtualField()
def lorem(instance):
return "Lorem ipsum dolor sit amet"
@lorem.setter
def lorem(instance, value):
print("You have set lorem to: %s" % (value,))
"""
self.fset = fset
self.cache_when_set = still_cache
return self
def get_attname(self):
return self.name
def get_attname_column(self):
return self.get_attname(), None
def set_attributes_from_name(self, name):
if not self.name:
self.name = name
self.attname, self.column = self.get_attname(), None
self.concrete = False
if self.verbose_name is None and name:
self.verbose_name = name.replace('_', ' ')
if not self.act_as_field:
self.short_description = self.verbose_name
def contribute_to_class(self, cls, name):
"""Applies this field to the `cls' class, with name `name'"""
self.name = name
self.set_attributes_from_name(name)
self.concrete = False # Force non-concrete
self.model = cls
# Django >=1.6 required
if self.act_as_field:
if hasattr(cls._meta, 'add_virtual_field'):
cls._meta.add_virtual_field(self)
else:
try:
cls._meta.add_field(self, virtual=True)
except:
if hasattr(cls._meta, 'virtual_fields'):
cls._meta.virtual_fields.append(self)
else:
# Just act as a property
pass
setattr(cls, name, self)
def _store_cache(self, instance, value):
"""Stores the invocation result `value' to fields referenced by `target'."""
instance.__dict__[self.name] = value
for keys in self._vf_targets:
parent, child = None, instance
for k in keys[:-1]:
parent, child = child, getattr(child, k, None)
if child is None:
break
if child is not None:
setattr(child, keys[-1], value)
def _get_reflects(self, instance, do_cache, do_clean):
"""Fetch all proxies (reflect/target), and optionally copy them to targets and delete them."""
logger.debug("%s trying on %s (%#x, %s): %s" % (self.name, instance, id(instance), instance.__dict__, repr(self._vf_reflects)))
value = self
to_be_cleaned = []
for is_reflect, keys in self._vf_enumerate():
parent, child = None, instance
t = 0
logger.debug("Keys: %s" % (keys,))
for k in keys:
parent, child = child, getattr(child, k, self)
logger.debug("%s To %s: %s" % (" "*t, k, "Invalid" if child is _invalid else getattr(child, '__dict__', child)))
if child is self:
break
t += 1
if child is not instance and child is not self:
logger.debug("%s -> Found!" % (" "*t))
if do_cache:
logger.debug("%s -> Putting cache as instructed.", " "*t)
logger.debug("%s -> BEFORE: %s", " "*t, instance.__dict__)
value = child
logger.debug("%s -> AFTER: %s", " "*t, instance.__dict__)
if do_clean and is_reflect and parent is not None:
logger.debug("%s -> Cleaning as instructed." % (" "*t))
to_be_cleaned.append((parent, k))
yield child
# Store cache after the loop so the targets are not polluted in the loop.
if value is not self:
self._store_cache(instance, child)
for parent, k in to_be_cleaned:
try:
delattr(parent, k)
except AttributeError:
pass
def is_cached(self, instance):
"""Determine if this field would return a cached value on `instance'."""
if self.name in instance.__dict__:
return True
for x in self._get_reflects(instance, False, False):
# Encountered any valid cache store
return True
return False
def cleanup_reflects(self, instance):
"""Cleans up (delattr()s) all `reflect's on `instance'.
Note: this doesn't clean `target' attributes, which makes this field still cached."""
for x in self._get_reflects(instance, False, True): pass
def refresh(self, instance, *args, **kwargs):
"""Refreshes the field immediately. This accepts additional arguments which are passed directly to the getter."""
self.cleanup_reflects(instance)
self._store_cache(instance, self.fget(instance, *args, **kwargs))
__call__ = getter # Acts as getter
def __get__(self, instance, owner=None):
if instance is None:
return self
if self.name in instance.__dict__:
# Not taking very advantage of cache :/
return instance.__dict__[self.name]
if self.fget is None:
raise AttributeError("This virtual field is not readable")
# Iterate over all redirects.
caches = self._get_reflects(instance, self.cached, True)
try:
cache = next(caches)
except StopIteration:
if settings.DEBUG and self._vf_reflects:
logger.debug("CACHE: MISS")
logger.debug(instance.__dict__)
# No reflects!
value = self.fget(instance)
if self.cached:
instance.__dict__[self.name] = value
return value
else:
logger.debug("CACHE: FOUND")
# Clean-up all other reflects
for x in caches: pass
# Return the first found cache
return cache
def __set__(self, instance, value):
if instance is None:
raise ValueError("instance is None")
if self.cache_when_set:
instance.__dict__[self.name] = value
self._store_cache(instance, value)
if self.fset is not None:
self.fset(instance, value)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment