-
-
Save miracle2k/52a031cced285ba9b8cd to your computer and use it in GitHub Desktop.
Support Raw json objects, OrderedDict, dates.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import sqlalchemy | |
from sqlalchemy import UnicodeText | |
from sqlalchemy.ext.mutable import Mutable | |
from datetime import datetime | |
from collections import OrderedDict | |
__all__ = ('JSON', 'JsonRaw') | |
def date_handler(obj): | |
return obj.isoformat() if hasattr(obj, 'isoformat') else obj | |
class JsonRaw(unicode): | |
"""Allows interacting with a JSON field using a raw string. | |
For example:: | |
instance.json_column = JsonRaw('{"a": 4}') | |
""" | |
pass | |
# Set this to the standard dict if Order is not required | |
DictClass = OrderedDict | |
class JSONEncodedObj(sqlalchemy.types.TypeDecorator): | |
"""Represents an immutable structure as a json-encoded string. | |
If default is, for example, a dict, then a NULL value in the | |
database will be exposed as an empty dict. | |
""" | |
impl = UnicodeText | |
def __init__(self, *args, **kwargs): | |
self.default = kwargs.pop('default', None) | |
super(JSONEncodedObj, self).__init__(*args, **kwargs) | |
def process_bind_param(self, value, dialect): | |
if isinstance(value, JsonRaw): | |
value = value | |
elif value is not None: | |
value = json.dumps(value, default=date_handler) | |
return value | |
def process_result_value(self, value, dialect): | |
if self.default is not None and not value: | |
return self.default() | |
if value is not None: | |
value = json.loads(value, object_pairs_hook=DictClass) | |
return value | |
class MutationObj(Mutable): | |
@classmethod | |
def coerce(cls, key, value): | |
if isinstance(value, dict) and not isinstance(value, MutationDict): | |
return MutationDict.coerce(key, value) | |
if isinstance(value, list) and not isinstance(value, MutationList): | |
return MutationList.coerce(key, value) | |
return value | |
@classmethod | |
def _listen_on_attribute(cls, attribute, coerce, parent_cls): | |
key = attribute.key | |
if parent_cls is not attribute.class_: | |
return | |
# rely on "propagate" here | |
parent_cls = attribute.class_ | |
def load(state, *args): | |
val = state.dict.get(key, None) | |
if coerce: | |
val = cls.coerce(key, val) | |
state.dict[key] = val | |
if isinstance(val, cls): | |
val._parents[state.obj()] = key | |
def set(target, value, oldvalue, initiator): | |
if not isinstance(value, cls): | |
value = cls.coerce(key, value) | |
if isinstance(value, cls): | |
value._parents[target.obj()] = key | |
if isinstance(oldvalue, cls): | |
oldvalue._parents.pop(target.obj(), None) | |
return value | |
def pickle(state, state_dict): | |
val = state.dict.get(key, None) | |
if isinstance(val, cls): | |
if 'ext.mutable.values' not in state_dict: | |
state_dict['ext.mutable.values'] = [] | |
state_dict['ext.mutable.values'].append(val) | |
def unpickle(state, state_dict): | |
if 'ext.mutable.values' in state_dict: | |
for val in state_dict['ext.mutable.values']: | |
val._parents[state.obj()] = key | |
sqlalchemy.event.listen(parent_cls, 'load', load, raw=True, propagate=True) | |
sqlalchemy.event.listen(parent_cls, 'refresh', load, raw=True, propagate=True) | |
sqlalchemy.event.listen(attribute, 'set', set, raw=True, retval=True, propagate=True) | |
sqlalchemy.event.listen(parent_cls, 'pickle', pickle, raw=True, propagate=True) | |
sqlalchemy.event.listen(parent_cls, 'unpickle', unpickle, raw=True, propagate=True) | |
class MutationDict(MutationObj, DictClass): | |
@classmethod | |
def coerce(cls, key, value): | |
"""Convert plain dictionary to MutationDict""" | |
self = MutationDict((k,MutationObj.coerce(key,v)) for (k,v) in value.items()) | |
self._key = key | |
return self | |
def __setitem__(self, key, value): | |
# Due to the way OrderedDict works, this is called during __init__. | |
# At this time we don't have a key set, but what is more, the value | |
# being set has already been coerced. So special case this and skip. | |
if hasattr(self, '_key'): | |
value = MutationObj.coerce(self._key, value) | |
DictClass.__setitem__(self, key, value) | |
self.changed() | |
def __delitem__(self, key): | |
DictClass.__delitem__(self, key) | |
self.changed() | |
class MutationList(MutationObj, list): | |
@classmethod | |
def coerce(cls, key, value): | |
"""Convert plain list to MutationList""" | |
self = MutationList((MutationObj.coerce(key, v) for v in value)) | |
self._key = key | |
return self | |
def __setitem__(self, idx, value): | |
list.__setitem__(self, idx, MutationObj.coerce(self._key, value)) | |
self.changed() | |
def __setslice__(self, start, stop, values): | |
list.__setslice__(self, start, stop, (MutationObj.coerce(self._key, v) for v in values)) | |
self.changed() | |
def __delitem__(self, idx): | |
list.__delitem__(self, idx) | |
self.changed() | |
def __delslice__(self, start, stop): | |
list.__delslice__(self, start, stop) | |
self.changed() | |
def append(self, value): | |
list.append(self, MutationObj.coerce(self._key, value)) | |
self.changed() | |
def insert(self, idx, value): | |
list.insert(self, idx, MutationObj.coerce(self._key, value)) | |
self.changed() | |
def extend(self, values): | |
list.extend(self, (MutationObj.coerce(self._key, v) for v in values)) | |
self.changed() | |
def pop(self, *args, **kw): | |
value = list.pop(self, *args, **kw) | |
self.changed() | |
return value | |
def remove(self, value): | |
list.remove(self, value) | |
self.changed() | |
JSON = MutationObj.as_mutable(JSONEncodedObj) | |
"""A type to encode/decode JSON on the fly | |
sqltype is the string type for the underlying DB column:: | |
Column(JSON) | |
""" | |
JSONDict = MutationObj.as_mutable(JSONEncodedObj(default=DictClass)) | |
JSONList = MutationObj.as_mutable(JSONEncodedObj(default=list)) |
For those using this code (we do!), this needs to be updated to accommodate these changes:
sqlalchemy/sqlalchemy@1031fc6
See sqlalchemy/sqlalchemy#6018 for context.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Holy moly, this is gold!