Django : clone model instance on update via a descriptor (used as a decorator on `save` method)
class clone_on_update(object): | |
"""Descriptor to clone the model instance when any field | |
is updated. This is meant to be set as a decorator on `save` | |
method of the model. | |
Initialization Args: | |
- `condition` (str/callable) [Optional] (default: None) | |
- `active_status_field` (str) [Optional] (default: None) | |
- `update_one_to_one` (bool) [Optional] (default: True) | |
#### condition | |
An optional argument `condition` can be passed to do the | |
cloning conditionally, the cloning only takes place if | |
`condition` is a trythy value. `condition` can also be a | |
callable, which in turn is called to get the boolean status. | |
A value of `None` indicates the cloning should be done | |
unconditionally. | |
The model object can also be referred in the condition using | |
the string format `self.<attribute_chain>`. The object *must* | |
be referred with the string `self`. The final `attribute` can | |
also be a callable. For example: | |
@clone_on_update(condition='self.foobar.exists') # callable | |
def save(self, *args, **kwargs): | |
... | |
@clone_on_update(condition='self.is_valid') # property | |
def save(self, *args, **kwargs): | |
... | |
You can't refer to the object in condition while the decorator | |
is called because it does not exist at that point, so this | |
approach can be used to refer to any object attribute if needed. | |
Also note that, only attribute accesses can be used as the | |
condition string and the string **must** start with `self.` to | |
get this feature (as regular callables can be used directly). | |
#### active_status_field | |
`active_status_field` can be the name of a BooleanField which | |
would be set to False on the old instance and will be set to | |
True on new instance. | |
#### update_one_to_one | |
If `update_one_to_one` is set to True, all related one-to-one | |
fields are updated (using `clone_on_update`) and the related | |
one-to-one instances are set to the newly created instances. | |
If set to False, all related one-to-one fields are set to None, | |
so the fields must be NULL-able in that case. | |
**NOTE**: Relations that are not defined on the model as | |
fields, are not copied to the new instance and hence | |
needed to be assigned manually. | |
""" | |
def __init__( | |
self, | |
condition=None, | |
active_status_field=None, | |
update_one_to_one=True, | |
): | |
self.condition = condition | |
if active_status_field is not None: | |
if not isinstance(active_status_field, basestring): | |
raise TypeError( | |
'active_status_field must be a string.' | |
) | |
self.active_status_field = active_status_field | |
self.update_one_to_one = update_one_to_one | |
self.save_func = None | |
self.obj = None | |
if self._is_save_func(condition): | |
self.save_func = condition | |
self.condition = None | |
def __call__(self, save_func): | |
if not self._is_save_func(save_func): | |
raise ValueError( | |
'save_func must be the `save` method on the model class.' | |
) | |
self.save_func = save_func | |
return self | |
def _is_save_func(self, func): | |
"""Returns whether `func` is a callable named `save`.""" | |
return ( | |
isinstance(func, (types.FunctionType, types.MethodType)) | |
and func.__name__ == 'save' | |
) | |
@property | |
def _condition_is_met(self): | |
"""Returns whether the condition is met i.e. | |
is truthy or evaluates to truthy value. | |
""" | |
if self.condition is None: | |
return True | |
if isinstance(self.condition, basestring): | |
self.condition = self._condition_from_str | |
if callable(self.condition): | |
return self.condition() | |
return self.condition | |
@property | |
def _condition_from_str(self): | |
"""If the condition is in format `self.<attribute>`, | |
parses that for object attributes and returns the | |
eventual object. Otherwise returns the condition | |
string as-is. | |
""" | |
condition = self.condition | |
if not condition.startswith('self.'): | |
return condition | |
if self.obj is None: | |
return condition | |
condition_obj = self.obj | |
splitted_attrs = condition.split('.')[1:] | |
for attr in splitted_attrs: | |
condition_obj = getattr(condition_obj, attr) | |
return condition_obj | |
def _get_fields(self, obj): | |
"""Returns a tuple of (one_to_one_fields, | |
m2m_fields, all_fields_without_m2m_fields) | |
belonging to the model object `obj`. | |
""" | |
m2m_fields = obj._meta.many_to_many | |
all_fields_without_m2m_fields = obj._meta.fields | |
one_to_one_fields = [ | |
field | |
for field in all_fields_without_m2m_fields | |
if isinstance(field, models.OneToOneField) | |
] | |
return one_to_one_fields, m2m_fields, all_fields_without_m2m_fields | |
def __get__(self, obj, cls=None): | |
if self.save_func is None: | |
raise ValueError( | |
'No `save` method found.' | |
) | |
if obj is None: | |
return self.save_func | |
self.obj = obj | |
def inner(*args, **kwargs): | |
if obj.pk is not None and self._condition_is_met: | |
one_to_one_fields, m2m_fields, _ = self._get_fields(obj) | |
# PK | |
orig_pk = obj.pk | |
obj.pk = None | |
# ID | |
try: | |
_ = obj.id | |
except AttributeError: | |
pass | |
else: | |
obj.id = None | |
if self.active_status_field is not None: | |
setattr(obj, self.active_status_field, True) | |
self._update_one_to_one_fields(obj, one_to_one_fields) | |
obj.save() | |
if self.active_status_field is not None: | |
obj.__class__.objects.filter(pk=orig_pk).update( | |
**{self.active_status_field: False} | |
) | |
self._update_many_to_many_fields(orig_pk, obj, m2m_fields) | |
self.save_func(obj, *args, **kwargs) | |
return inner | |
def _update_one_to_one_fields(self, obj, one_to_one_fields): | |
"""Updates all related one to one fields.""" | |
if self.update_one_to_one: | |
self._full_update_one_to_one_fields(obj, one_to_one_fields) | |
else: | |
for field in one_to_one_fields: | |
setattr(obj, field.name, None) | |
def _full_update_one_to_one_fields(self, obj, one_to_one_fields): | |
"""Run clone_on_update on each related one to one fields.""" | |
for field in one_to_one_fields: | |
orig_one_to_one_obj = getattr(obj, field.name) | |
setattr( | |
orig_one_to_one_obj.__class__, | |
'save', | |
self.__class__(orig_one_to_one_obj.__class__.save) | |
) | |
orig_one_to_one_obj.save(orig_one_to_one_obj) | |
# orig_one_to_one_obj is now a new one | |
setattr(obj, field.name, orig_one_to_one_obj) | |
def _update_many_to_many_fields(self, orig_pk, obj, m2m_fields): | |
"""Updates all related many-to-many fields.""" | |
if m2m_fields: | |
orig_obj = obj.__class__.objects.get(pk=orig_pk) | |
for field in m2m_fields: | |
orig_m2m_relation_obj = getattr(orig_obj, field.name) | |
new_m2m_relation_obj = getattr(obj, field.name) | |
new_m2m_relation_obj.set(orig_m2m_relation_obj.all()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment