Skip to content

Instantly share code, notes, and snippets.

@Xowap
Created March 29, 2017 16:22
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Xowap/97060dbd4b6db378a9e815b8849178f8 to your computer and use it in GitHub Desktop.
Save Xowap/97060dbd4b6db378a9e815b8849178f8 to your computer and use it in GitHub Desktop.
Django Upsert
from collections import namedtuple
def auto_dict(val):
# type: (Union[Dict, NamedTuple]) -> Dict
"""
Transforms named tuples into dict. Does nothing if `val` is already a dict.
:param val: a NamedTuple instance or a dict
:return: dict version of val
"""
if not isinstance(val, dict) and hasattr(val, '__dict__'):
return val.__dict__
# noinspection PyTypeChecker
return val
# noinspection PyTypeChecker
def upsert(cls, key_fields, data):
# type: (models.Model, List[AnyStr], Iterable[Union[Dict, NamedTuple]]) -> None
"""
Upsert data using the Django ORM in a optimized manner.
It will look for existing data in DB and update it row per row only if needed. Then all the data
that was not found will be created in a bulk statement.
Warning: no validation is done on data, you need to make sure that what you send is correct.
In order to find existing data, it will use the `key_fields`: if all the listed fields are equal
then objects are considered to be the same. In order to speedup the DB lookup, only the first
field will be used. Because of that, you'll get a much better performance if you use
"program id" as first field instead of "lang".
This will also update HStoreFields, in the sense that if there is a dictionary value then it
will update the dictionary instead of replacing it.
>>> data = [{'iso_code': 'FR', 'label': {'fr': 'France'}}]
>>> upsert(Country, ['iso_code'], data)
:param cls: class of the model
:param key_fields: name of the fields that provide uniqueness
:param data: a list of data
"""
UpsertKey = namedtuple('UpsertKey', key_fields)
expected = dict((UpsertKey(*[x.get(y) for y in key_fields]), x) for x in map(auto_dict, data))
actual = dict((UpsertKey(*[getattr(x, y, None) for y in key_fields]), x)
for x in cls.objects.filter(**{
key_fields[0] + '__in': [getattr(z, key_fields[0]) for z in expected.keys()]
}))
to_add = []
for key, content in expected.items():
if key in actual:
obj = actual[key]
has_diff = False
for k, v in content.items():
obj_v = getattr(obj, k)
if getattr(obj, k) != v:
has_diff = True
if isinstance(obj_v, dict):
obj_v.update(v)
else:
setattr(obj, k, v)
if has_diff:
obj.save()
else:
to_add.append(content)
# noinspection PyCallingNonCallable
cls.objects.bulk_create(cls(**x) for x in to_add)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment