Skip to content

Instantly share code, notes, and snippets.

Forked from dbarnett/
Last active March 26, 2021 05:19
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save miracle2k/52a031cced285ba9b8cd to your computer and use it in GitHub Desktop.
Save miracle2k/52a031cced285ba9b8cd to your computer and use it in GitHub Desktop.
Support Raw json objects, OrderedDict, dates.
import json
import sqlalchemy
from sqlalchemy import UnicodeText
from sqlalchemy.ext.mutable import Mutable
from datetime import datetime
from collections import OrderedDict
__all__ = ('JSON', 'JsonRaw')
def date_handler(obj):
return obj.isoformat() if hasattr(obj, 'isoformat') else obj
class JsonRaw(unicode):
"""Allows interacting with a JSON field using a raw string.
For example::
instance.json_column = JsonRaw('{"a": 4}')
# Set this to the standard dict if Order is not required
DictClass = OrderedDict
class JSONEncodedObj(sqlalchemy.types.TypeDecorator):
"""Represents an immutable structure as a json-encoded string.
If default is, for example, a dict, then a NULL value in the
database will be exposed as an empty dict.
impl = UnicodeText
def __init__(self, *args, **kwargs):
self.default = kwargs.pop('default', None)
super(JSONEncodedObj, self).__init__(*args, **kwargs)
def process_bind_param(self, value, dialect):
if isinstance(value, JsonRaw):
value = value
elif value is not None:
value = json.dumps(value, default=date_handler)
return value
def process_result_value(self, value, dialect):
if self.default is not None and not value:
return self.default()
if value is not None:
value = json.loads(value, object_pairs_hook=DictClass)
return value
class MutationObj(Mutable):
def coerce(cls, key, value):
if isinstance(value, dict) and not isinstance(value, MutationDict):
return MutationDict.coerce(key, value)
if isinstance(value, list) and not isinstance(value, MutationList):
return MutationList.coerce(key, value)
return value
def _listen_on_attribute(cls, attribute, coerce, parent_cls):
key = attribute.key
if parent_cls is not attribute.class_:
# rely on "propagate" here
parent_cls = attribute.class_
def load(state, *args):
val = state.dict.get(key, None)
if coerce:
val = cls.coerce(key, val)
state.dict[key] = val
if isinstance(val, cls):
val._parents[state.obj()] = key
def set(target, value, oldvalue, initiator):
if not isinstance(value, cls):
value = cls.coerce(key, value)
if isinstance(value, cls):
value._parents[target.obj()] = key
if isinstance(oldvalue, cls):
oldvalue._parents.pop(target.obj(), None)
return value
def pickle(state, state_dict):
val = state.dict.get(key, None)
if isinstance(val, cls):
if 'ext.mutable.values' not in state_dict:
state_dict['ext.mutable.values'] = []
def unpickle(state, state_dict):
if 'ext.mutable.values' in state_dict:
for val in state_dict['ext.mutable.values']:
val._parents[state.obj()] = key
sqlalchemy.event.listen(parent_cls, 'load', load, raw=True, propagate=True)
sqlalchemy.event.listen(parent_cls, 'refresh', load, raw=True, propagate=True)
sqlalchemy.event.listen(attribute, 'set', set, raw=True, retval=True, propagate=True)
sqlalchemy.event.listen(parent_cls, 'pickle', pickle, raw=True, propagate=True)
sqlalchemy.event.listen(parent_cls, 'unpickle', unpickle, raw=True, propagate=True)
class MutationDict(MutationObj, DictClass):
def coerce(cls, key, value):
"""Convert plain dictionary to MutationDict"""
self = MutationDict((k,MutationObj.coerce(key,v)) for (k,v) in value.items())
self._key = key
return self
def __setitem__(self, key, value):
# Due to the way OrderedDict works, this is called during __init__.
# At this time we don't have a key set, but what is more, the value
# being set has already been coerced. So special case this and skip.
if hasattr(self, '_key'):
value = MutationObj.coerce(self._key, value)
DictClass.__setitem__(self, key, value)
def __delitem__(self, key):
DictClass.__delitem__(self, key)
class MutationList(MutationObj, list):
def coerce(cls, key, value):
"""Convert plain list to MutationList"""
self = MutationList((MutationObj.coerce(key, v) for v in value))
self._key = key
return self
def __setitem__(self, idx, value):
list.__setitem__(self, idx, MutationObj.coerce(self._key, value))
def __setslice__(self, start, stop, values):
list.__setslice__(self, start, stop, (MutationObj.coerce(self._key, v) for v in values))
def __delitem__(self, idx):
list.__delitem__(self, idx)
def __delslice__(self, start, stop):
list.__delslice__(self, start, stop)
def append(self, value):
list.append(self, MutationObj.coerce(self._key, value))
def insert(self, idx, value):
list.insert(self, idx, MutationObj.coerce(self._key, value))
def extend(self, values):
list.extend(self, (MutationObj.coerce(self._key, v) for v in values))
def pop(self, *args, **kw):
value = list.pop(self, *args, **kw)
return value
def remove(self, value):
list.remove(self, value)
JSON = MutationObj.as_mutable(JSONEncodedObj)
"""A type to encode/decode JSON on the fly
sqltype is the string type for the underlying DB column::
JSONDict = MutationObj.as_mutable(JSONEncodedObj(default=DictClass))
JSONList = MutationObj.as_mutable(JSONEncodedObj(default=list))
Copy link

Holy moly, this is gold!

Copy link

jdavcs commented Mar 26, 2021

For those using this code (we do!), this needs to be updated to accommodate these changes:
See sqlalchemy/sqlalchemy#6018 for context.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment