-
-
Save om2c0de/ad429927345e43fe2d2c365759bb9976 to your computer and use it in GitHub Desktop.
Django JSONField with Pydantic schema support
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import partial | |
import pydantic | |
import logging | |
from django.contrib.postgres.fields import JSONField | |
from typing import Type, Union, Tuple | |
from django.core.serializers.json import DjangoJSONEncoder | |
logger = logging.getLogger(__name__) | |
def default_error_handler(obj, errors): | |
logger.warning( | |
'Can not parse stored object with schema obj=%s, errors=%s', | |
obj, errors | |
) | |
return obj | |
class FieldToPythonSetter: | |
""" | |
Forces Django to call to_python on fields when setting them. | |
This is useful when you want to add some custom field data postprocessing. | |
Should be added to field like a so: | |
``` | |
def contribute_to_class(self, cls, name, *args, **kwargs): | |
super(JSONField, self).contribute_to_class(cls, name, *args, **kwargs) | |
setattr(cls, name, FieldToPythonSetter(self)) | |
``` | |
""" | |
def __init__(self, field): | |
self.field = field | |
def __get__(self, obj, cls=None): | |
return obj.__dict__[self.field.name] | |
def __set__(self, obj, value): | |
obj.__dict__[self.field.name] = self.field.to_python(value) | |
class JSONSchemedEncoder(DjangoJSONEncoder): | |
def __init__( | |
self, | |
*args, | |
schema: Union[Tuple[Type[pydantic.BaseModel]], Type[pydantic.BaseModel]], | |
**kwargs | |
): | |
if not isinstance(schema, tuple): | |
self.schemas = (schema, ) | |
else: | |
self.schemas = schema | |
super().__init__(*args, **kwargs) | |
def encode(self, obj): | |
if not isinstance(obj, pydantic.BaseModel): | |
# this flow used for expressions like .filter(data__contains={}) | |
# we don't want that {} to be parsed as schema | |
return super().encode(obj) | |
return obj.json() | |
class JSONSchemedDecoder: | |
def __init__( | |
self, | |
schema: Union[Tuple[Type[pydantic.BaseModel]], Type[pydantic.BaseModel]], | |
error_handler=default_error_handler, | |
): | |
if not isinstance(schema, tuple): | |
self.schemas = (schema, ) | |
else: | |
self.schemas = schema | |
def decode(self, obj): | |
if isinstance(obj, self.schemas): | |
return obj | |
errors = [] | |
for schema in self.schemas: | |
try: | |
return schema.parse_obj(obj) | |
except pydantic.ValidationError as exc: | |
errors.append((schema, exc.errors())) | |
except TypeError as exc: | |
errors.append((schema, str(exc))) | |
return default_error_handler(obj, errors) | |
class JSONSchemedField(JSONField): | |
def __init__(self, *args, schema=None, error_handler=default_error_handler, **kwargs): | |
super().__init__(*args, **kwargs) | |
self._schemas = self._populate_schemas(schema) | |
self.decoder = JSONSchemedDecoder(schema=self._schemas, error_handler=error_handler) | |
self.encoder = partial(JSONSchemedEncoder, schema=self._schemas) | |
def deconstruct(self): | |
name, path, args, kwargs = super().deconstruct() | |
kwargs['schema'] = self._schemas | |
return name, path, args, kwargs | |
@staticmethod | |
def _populate_schemas(schema) -> Tuple[Type[pydantic.BaseModel]]: | |
assert schema is not None, 'Schema can not be None' | |
if isinstance(schema, tuple): | |
return schema | |
if isinstance(schema, type) and issubclass(schema, pydantic.BaseModel): | |
return schema, | |
origin = getattr(schema, '__origin__', None) | |
if origin is Union: | |
for s in schema.__args__: | |
assert issubclass(s, pydantic.BaseModel) | |
return schema.__args__ | |
# only pydantic.BaseModel and typing.Union are supported | |
raise AssertionError('Unsupported schema type: {0}'.format(type(schema))) | |
def to_python(self, value): | |
if value is None: | |
return None | |
return self.decoder.decode(value) | |
def contribute_to_class(self, cls, name, *args, **kwargs): | |
super(JSONField, self).contribute_to_class(cls, name, *args, **kwargs) | |
setattr(cls, name, FieldToPythonSetter(self)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment