Last active
December 13, 2015 22:49
-
-
Save davesnowdon/4987712 to your computer and use it in GitHub Desktop.
Serialise custom python objects to/from JSON using 2 helper functions in the classes to handle the conversion for each class.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import inspect | |
import json | |
''' | |
Functions for clients to call to serialise/unserialise from strings and files | |
Idea taken from http://getpython3.com/diveintopython3/serializing.html | |
and generalised to allow helper functions to be part of the custom classes | |
and avoid case statements | |
''' | |
def to_json_file(obj, fp): | |
json.dump(obj, fp, default=to_json_helper) | |
def to_json_string(obj): | |
return json.dumps(obj, default=to_json_helper) | |
def from_json_file(fp): | |
return json.load(fp, object_hook=from_json_helper) | |
def from_json_string(sv): | |
return json.loads(sv, object_hook=from_json_helper) | |
def class_to_name(klass): | |
return klass.__name__ | |
def object_to_name(obj): | |
return obj.__class__.__name__ | |
def object_to_FQCN(obj): | |
return obj.__module__ + '.' + obj.__class__.__name__ | |
CLASS_TAG = '__class__' | |
VALUE_TAG = '__value__' | |
''' | |
Remove class/function name to leave module name | |
''' | |
def FQCN_to_module(s): | |
(moduleName, _) = split_FQCN(s) | |
return moduleName | |
''' | |
Strip module name to leave class/function name | |
''' | |
def FQCN_to_class(s): | |
(_, className) = split_FQCN(s) | |
return className | |
''' | |
Split class name from module | |
''' | |
def split_FQCN(s): | |
if '.' in s: | |
return s.rsplit('.', 1) | |
else: | |
return (None, s) | |
''' | |
Look up a class from a FQCN | |
''' | |
def find_class(fqcn): | |
(moduleName, className) = split_FQCN(fqcn) | |
m = None | |
if not moduleName is None: | |
try: | |
m = __import__(moduleName) | |
except ImportError: | |
print "Unable to import from " + moduleName | |
#for name, obj in inspect.getmembers(m): | |
for name, obj in inspect.getmembers(sys.modules[moduleName]): | |
if inspect.isclass(obj) and className == name: | |
return obj | |
raise TypeError("Can't find class " +fqcn) | |
''' | |
Convenience function to generate dictionary in correct format | |
''' | |
def object_to_json(obj, value): | |
if value is None: | |
return {CLASS_TAG: object_to_FQCN(obj) } | |
else: | |
return {CLASS_TAG: object_to_FQCN(obj), | |
VALUE_TAG: value} | |
''' | |
Helper method to generate JSON for custom classes | |
''' | |
def to_json_helper(python_object): | |
try: | |
method = getattr(python_object, 'to_json') | |
return method() | |
except AttributeError: | |
raise TypeError(repr(python_object) + ' is not JSON serializable') | |
''' | |
Helper function to detect serialized classes and call from_json on them | |
to regenerate the class | |
''' | |
def from_json_helper(json_object): | |
# check whether this is an object we serialised and tagged with the class name | |
if CLASS_TAG in json_object: | |
fqcn = json_object[CLASS_TAG] | |
klass = find_class(fqcn) | |
# invoke from_json on target class | |
try: | |
try: | |
json_object = getattr(klass, 'from_json')(json_object[VALUE_TAG]) | |
except KeyError: | |
json_object = getattr(klass, 'from_json')(None) | |
except AttributeError: | |
# class does not support being reconstituted from JSON | |
pass | |
return json_object | |
import unittest | |
''' | |
Test free functions in general utilities | |
''' | |
class TestFreeFunctions(unittest.TestCase): | |
def test_fqcn_to_module(self): | |
self.assertEqual(FQCN_to_module("wanderer.event.Start"), "wanderer.event") | |
def test_fqcn_to_module_no_module(self): | |
self.assertIsNone(FQCN_to_module("Start")) | |
def test_fqcn_to_class(self): | |
self.assertEqual(FQCN_to_class("wanderer.event.Start"), "Start") | |
def test_fqcn_to_class_no_module(self): | |
self.assertEqual(FQCN_to_class("Start"), "Start") | |
def test_find_class(self): | |
self.assertIsNotNone(find_class('wanderer_tests.test_general.JsonTestBase')) | |
''' | |
Tests for JSON serialization of custom objects | |
''' | |
class JsonTestBase(object): | |
def __init__(self): | |
super(JsonTestBase, self).__init__() | |
def name(self): | |
return self.__class__.__name__ | |
def __eq__(self, other): | |
return self.__dict__ == other.__dict__ | |
# used to support JSON serialisation of custom classes | |
def to_json(self): | |
return object_to_json(self, None) | |
# used to enable this class & sub types to be reconstituted from JSON | |
@classmethod | |
def from_json(klass, json_object): | |
return klass() | |
# class using base class JSON support | |
class JsonNoData(JsonTestBase): | |
def __init__(self): | |
super(JsonNoData, self).__init__() | |
# class with additional data to serialise | |
class JsonWithData(JsonTestBase): | |
def __init__(self, source, sensorData): | |
super(JsonWithData, self).__init__() | |
self.source = source | |
self.sensorData = sensorData | |
# used to support JSON serialisation of custom classes | |
def to_json(self): | |
return object_to_json(self, { 'source' : self.source, | |
'sensorData' : self.sensorData}) | |
# used to enable this class & sub types to be reconstituted from JSON | |
@classmethod | |
def from_json(klass, json_object): | |
print "json_object = "+repr(json_object) | |
return klass(json_object['source'], json_object['sensorData']) | |
class TestJson(unittest.TestCase): | |
def test_json_serialise_base(self): | |
b = JsonTestBase() | |
json = to_json_string(b) | |
print "Serialisation of JsonTestbase = \n"+json | |
self.assertIsNotNone(json, "Serialised object should not be None") | |
b2 = from_json_string(json) | |
self.assertTrue(isinstance(b2, JsonTestBase)) | |
self.assertEqual(b, b2, "Reconstituted object "+repr(b2)+" must equal original "+repr(b)) | |
def test_json_no_data(self): | |
b = JsonNoData() | |
json = to_json_string(b) | |
print "Serialisation of JsonNoData = \n"+json | |
self.assertIsNotNone(json, "Serialised object should not be None") | |
b2 = from_json_string(json) | |
self.assertTrue(isinstance(b2, JsonNoData)) | |
self.assertEqual(b, b2, "Reconstituted object "+repr(b2)+" must equal original "+repr(b)) | |
def test_json_with_data(self): | |
b = JsonWithData('foo', { 'a': 123, 'b' : 456}) | |
json = to_json_string(b) | |
print "Serialisation of JsonWithData = \n"+json | |
self.assertIsNotNone(json, "Serialised object should not be None") | |
b2 = from_json_string(json) | |
self.assertTrue(isinstance(b2, JsonWithData)) | |
self.assertEqual(b, b2, "Reconstituted object "+repr(b2)+" must equal original "+repr(b)) | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment