Created Mar 2, 2018
Serialiser for nameko
import json
from datetime import datetime, timedelta
import pendulum
import pytz
from kombu.serialization import register
from miso.utils import ServiceResult
class SpecialJSONEncoder(json.JSONEncoder):
Converts a python object, where datetime and timedelta objects are converted
into objects that can be decoded using the DateTimeAwareJSONDecoder.
def default(self, o): # pylint: disable=E0202
if isinstance(o, ServiceResult):
return dict(__type__='ServiceResult', **o.to_dict())
if isinstance(o, datetime):
return {
'__type__': 'datetime',
'year': o.year,
'month': o.month,
'hour': o.hour,
'minute': o.minute,
'second': o.second,
'microsecond': o.microsecond,
'tzinfo': if o.tzinfo else None
elif isinstance(o, timedelta):
return {
'__type__': 'timedelta',
'days': o.days,
'seconds': o.seconds,
'microseconds': o.microseconds,
return json.JSONEncoder.default(self, o)
class SpecialJSONDecoder(json.JSONDecoder):
Converts a json string, where datetime and timedelta objects were converted
into objects using the DateTimeAwareJSONEncoder, back into a python object.
def __init__(self):
json.JSONDecoder.__init__(self, object_hook=self.dict_to_object)
def dict_to_object(self, dict_):
""" Convert a dictionary with a __type__ key into special objects """
if '__type__' not in dict_:
return dict_
type_ = dict_.pop('__type__')
if type_ == 'datetime':
usetz = dict_.get('tzinfo', None)
if usetz is not None:
dict_['tzinfo'] = pytz.timezone(usetz)
return pendulum.instance(datetime(**dict_))
elif type_ == 'timedelta':
return timedelta(**dict_)
elif type_ == 'ServiceResult':
return ServiceResult(locked=True, **dict_)
# Oops... better put this back together.
dict_['__type__'] = type_
return dict_
# Register this serializer for use throughout our nameko project
# We should consider a better way than doing this upon import..
SpecialJSONEncoder().encode, SpecialJSONDecoder().decode,
'application/x-better-json', 'utf-8')
