Skip to content

Instantly share code, notes, and snippets.

@om2c0de
Forked from Bahus/json_schemed_field.py
Created January 30, 2020 12:45
Show Gist options
  • Save om2c0de/ad429927345e43fe2d2c365759bb9976 to your computer and use it in GitHub Desktop.
Save om2c0de/ad429927345e43fe2d2c365759bb9976 to your computer and use it in GitHub Desktop.
Django JSONField with Pydantic schema support
from functools import partial
import pydantic
import logging
from django.contrib.postgres.fields import JSONField
from typing import Type, Union, Tuple
from django.core.serializers.json import DjangoJSONEncoder
logger = logging.getLogger(__name__)
def default_error_handler(obj, errors):
logger.warning(
'Can not parse stored object with schema obj=%s, errors=%s',
obj, errors
)
return obj
class FieldToPythonSetter:
"""
Forces Django to call to_python on fields when setting them.
This is useful when you want to add some custom field data postprocessing.
Should be added to field like a so:
```
def contribute_to_class(self, cls, name, *args, **kwargs):
super(JSONField, self).contribute_to_class(cls, name, *args, **kwargs)
setattr(cls, name, FieldToPythonSetter(self))
```
"""
def __init__(self, field):
self.field = field
def __get__(self, obj, cls=None):
return obj.__dict__[self.field.name]
def __set__(self, obj, value):
obj.__dict__[self.field.name] = self.field.to_python(value)
class JSONSchemedEncoder(DjangoJSONEncoder):
def __init__(
self,
*args,
schema: Union[Tuple[Type[pydantic.BaseModel]], Type[pydantic.BaseModel]],
**kwargs
):
if not isinstance(schema, tuple):
self.schemas = (schema, )
else:
self.schemas = schema
super().__init__(*args, **kwargs)
def encode(self, obj):
if not isinstance(obj, pydantic.BaseModel):
# this flow used for expressions like .filter(data__contains={})
# we don't want that {} to be parsed as schema
return super().encode(obj)
return obj.json()
class JSONSchemedDecoder:
def __init__(
self,
schema: Union[Tuple[Type[pydantic.BaseModel]], Type[pydantic.BaseModel]],
error_handler=default_error_handler,
):
if not isinstance(schema, tuple):
self.schemas = (schema, )
else:
self.schemas = schema
def decode(self, obj):
if isinstance(obj, self.schemas):
return obj
errors = []
for schema in self.schemas:
try:
return schema.parse_obj(obj)
except pydantic.ValidationError as exc:
errors.append((schema, exc.errors()))
except TypeError as exc:
errors.append((schema, str(exc)))
return default_error_handler(obj, errors)
class JSONSchemedField(JSONField):
def __init__(self, *args, schema=None, error_handler=default_error_handler, **kwargs):
super().__init__(*args, **kwargs)
self._schemas = self._populate_schemas(schema)
self.decoder = JSONSchemedDecoder(schema=self._schemas, error_handler=error_handler)
self.encoder = partial(JSONSchemedEncoder, schema=self._schemas)
def deconstruct(self):
name, path, args, kwargs = super().deconstruct()
kwargs['schema'] = self._schemas
return name, path, args, kwargs
@staticmethod
def _populate_schemas(schema) -> Tuple[Type[pydantic.BaseModel]]:
assert schema is not None, 'Schema can not be None'
if isinstance(schema, tuple):
return schema
if isinstance(schema, type) and issubclass(schema, pydantic.BaseModel):
return schema,
origin = getattr(schema, '__origin__', None)
if origin is Union:
for s in schema.__args__:
assert issubclass(s, pydantic.BaseModel)
return schema.__args__
# only pydantic.BaseModel and typing.Union are supported
raise AssertionError('Unsupported schema type: {0}'.format(type(schema)))
def to_python(self, value):
if value is None:
return None
return self.decoder.decode(value)
def contribute_to_class(self, cls, name, *args, **kwargs):
super(JSONField, self).contribute_to_class(cls, name, *args, **kwargs)
setattr(cls, name, FieldToPythonSetter(self))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment