Skip to content

Instantly share code, notes, and snippets.

@kbni
Created March 2, 2018 05:02
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kbni/1775f5605de8c517c10e8cbbc12660c4 to your computer and use it in GitHub Desktop.
Save kbni/1775f5605de8c517c10e8cbbc12660c4 to your computer and use it in GitHub Desktop.
Serialiser for nameko
import json
from datetime import datetime, timedelta
import pendulum
import pytz
from kombu.serialization import register
from miso.utils import ServiceResult
class SpecialJSONEncoder(json.JSONEncoder):
"""
Converts a python object, where datetime and timedelta objects are converted
into objects that can be decoded using the DateTimeAwareJSONDecoder.
"""
def default(self, o): # pylint: disable=E0202
if isinstance(o, ServiceResult):
return dict(__type__='ServiceResult', **o.to_dict())
if isinstance(o, datetime):
return {
'__type__': 'datetime',
'year': o.year,
'month': o.month,
'day': o.day,
'hour': o.hour,
'minute': o.minute,
'second': o.second,
'microsecond': o.microsecond,
'tzinfo': o.tzinfo.name if o.tzinfo else None
}
elif isinstance(o, timedelta):
return {
'__type__': 'timedelta',
'days': o.days,
'seconds': o.seconds,
'microseconds': o.microseconds,
}
else:
return json.JSONEncoder.default(self, o)
class SpecialJSONDecoder(json.JSONDecoder):
"""
Converts a json string, where datetime and timedelta objects were converted
into objects using the DateTimeAwareJSONEncoder, back into a python object.
"""
def __init__(self):
json.JSONDecoder.__init__(self, object_hook=self.dict_to_object)
def dict_to_object(self, dict_):
""" Convert a dictionary with a __type__ key into special objects """
if '__type__' not in dict_:
return dict_
type_ = dict_.pop('__type__')
if type_ == 'datetime':
usetz = dict_.get('tzinfo', None)
if usetz is not None:
dict_['tzinfo'] = pytz.timezone(usetz)
return pendulum.instance(datetime(**dict_))
elif type_ == 'timedelta':
return timedelta(**dict_)
elif type_ == 'ServiceResult':
return ServiceResult(locked=True, **dict_)
else:
# Oops... better put this back together.
dict_['__type__'] = type_
return dict_
# Register this serializer for use throughout our nameko project
# We should consider a better way than doing this upon import..
register('betterjson',
SpecialJSONEncoder().encode, SpecialJSONDecoder().decode,
'application/x-better-json', 'utf-8')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment