Created
March 29, 2017 16:22
-
-
Save Xowap/97060dbd4b6db378a9e815b8849178f8 to your computer and use it in GitHub Desktop.
Django Upsert
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import namedtuple | |
def auto_dict(val): | |
# type: (Union[Dict, NamedTuple]) -> Dict | |
""" | |
Transforms named tuples into dict. Does nothing if `val` is already a dict. | |
:param val: a NamedTuple instance or a dict | |
:return: dict version of val | |
""" | |
if not isinstance(val, dict) and hasattr(val, '__dict__'): | |
return val.__dict__ | |
# noinspection PyTypeChecker | |
return val | |
# noinspection PyTypeChecker | |
def upsert(cls, key_fields, data): | |
# type: (models.Model, List[AnyStr], Iterable[Union[Dict, NamedTuple]]) -> None | |
""" | |
Upsert data using the Django ORM in a optimized manner. | |
It will look for existing data in DB and update it row per row only if needed. Then all the data | |
that was not found will be created in a bulk statement. | |
Warning: no validation is done on data, you need to make sure that what you send is correct. | |
In order to find existing data, it will use the `key_fields`: if all the listed fields are equal | |
then objects are considered to be the same. In order to speedup the DB lookup, only the first | |
field will be used. Because of that, you'll get a much better performance if you use | |
"program id" as first field instead of "lang". | |
This will also update HStoreFields, in the sense that if there is a dictionary value then it | |
will update the dictionary instead of replacing it. | |
>>> data = [{'iso_code': 'FR', 'label': {'fr': 'France'}}] | |
>>> upsert(Country, ['iso_code'], data) | |
:param cls: class of the model | |
:param key_fields: name of the fields that provide uniqueness | |
:param data: a list of data | |
""" | |
UpsertKey = namedtuple('UpsertKey', key_fields) | |
expected = dict((UpsertKey(*[x.get(y) for y in key_fields]), x) for x in map(auto_dict, data)) | |
actual = dict((UpsertKey(*[getattr(x, y, None) for y in key_fields]), x) | |
for x in cls.objects.filter(**{ | |
key_fields[0] + '__in': [getattr(z, key_fields[0]) for z in expected.keys()] | |
})) | |
to_add = [] | |
for key, content in expected.items(): | |
if key in actual: | |
obj = actual[key] | |
has_diff = False | |
for k, v in content.items(): | |
obj_v = getattr(obj, k) | |
if getattr(obj, k) != v: | |
has_diff = True | |
if isinstance(obj_v, dict): | |
obj_v.update(v) | |
else: | |
setattr(obj, k, v) | |
if has_diff: | |
obj.save() | |
else: | |
to_add.append(content) | |
# noinspection PyCallingNonCallable | |
cls.objects.bulk_create(cls(**x) for x in to_add) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment