Skip to content

Instantly share code, notes, and snippets.

@sh4nks
Forked from dbarnett/jsonalchemy.py
Last active August 29, 2015 14:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sh4nks/8dff3ff02a225151817c to your computer and use it in GitHub Desktop.
Save sh4nks/8dff3ff02a225151817c to your computer and use it in GitHub Desktop.
import simplejson
import sqlalchemy
from sqlalchemy import String
from sqlalchemy.ext.mutable import Mutable
class JSONEncodedObj(sqlalchemy.types.TypeDecorator):
"""Represents an immutable structure as a json-encoded string."""
impl = String
def process_bind_param(self, value, dialect):
if value is not None:
value = simplejson.dumps(value)
return value
def process_result_value(self, value, dialect):
if value is not None:
value = simplejson.loads(value)
return value
class MutationObj(Mutable):
@classmethod
def coerce(cls, key, value):
if isinstance(value, dict) and not isinstance(value, MutationDict):
return MutationDict.coerce(key, value)
if isinstance(value, list) and not isinstance(value, MutationList):
return MutationList.coerce(key, value)
return value
@classmethod
def _listen_on_attribute(cls, attribute, coerce, parent_cls):
key = attribute.key
if parent_cls is not attribute.class_:
return
# rely on "propagate" here
parent_cls = attribute.class_
def load(state, *args):
val = state.dict.get(key, None)
if coerce:
val = cls.coerce(key, val)
state.dict[key] = val
if isinstance(val, cls):
val._parents[state.obj()] = key
def set(target, value, oldvalue, initiator):
if not isinstance(value, cls):
value = cls.coerce(key, value)
if isinstance(value, cls):
value._parents[target.obj()] = key
if isinstance(oldvalue, cls):
oldvalue._parents.pop(target.obj(), None)
return value
def pickle(state, state_dict):
val = state.dict.get(key, None)
if isinstance(val, cls):
if 'ext.mutable.values' not in state_dict:
state_dict['ext.mutable.values'] = []
state_dict['ext.mutable.values'].append(val)
def unpickle(state, state_dict):
if 'ext.mutable.values' in state_dict:
for val in state_dict['ext.mutable.values']:
val._parents[state.obj()] = key
sqlalchemy.event.listen(parent_cls, 'load', load, raw=True, propagate=True)
sqlalchemy.event.listen(parent_cls, 'refresh', load, raw=True, propagate=True)
sqlalchemy.event.listen(attribute, 'set', set, raw=True, retval=True, propagate=True)
sqlalchemy.event.listen(parent_cls, 'pickle', pickle, raw=True, propagate=True)
sqlalchemy.event.listen(parent_cls, 'unpickle', unpickle, raw=True, propagate=True)
class MutationDict(MutationObj, dict):
@classmethod
def coerce(cls, key, value):
"""Convert plain dictionary to MutationDict"""
self = MutationDict((k,MutationObj.coerce(key,v)) for (k,v) in value.items())
self._key = key
return self
def __setitem__(self, key, value):
dict.__setitem__(self, key, MutationObj.coerce(self._key, value))
self.changed()
def __delitem__(self, key):
dict.__delitem__(self, key)
self.changed()
class MutationList(MutationObj, list):
@classmethod
def coerce(cls, key, value):
"""Convert plain list to MutationList"""
self = MutationList((MutationObj.coerce(key, v) for v in value))
self._key = key
return self
def __setitem__(self, idx, value):
list.__setitem__(self, idx, MutationObj.coerce(self._key, value))
self.changed()
def __setslice__(self, start, stop, values):
list.__setslice__(self, start, stop, (MutationObj.coerce(self._key, v) for v in values))
self.changed()
def __delitem__(self, idx):
list.__delitem__(self, idx)
self.changed()
def __delslice__(self, start, stop):
list.__delslice__(self, start, stop)
self.changed()
def append(self, value):
list.append(self, MutationObj.coerce(self._key, value))
self.changed()
def insert(self, idx, value):
list.insert(self, idx, MutationObj.coerce(self._key, value))
self.changed()
def extend(self, values):
list.extend(self, (MutationObj.coerce(self._key, v) for v in values))
self.changed()
def pop(self, *args, **kw):
value = list.pop(self, *args, **kw)
self.changed()
return value
def remove(self, value):
list.remove(self, value)
self.changed()
def JSONAlchemy(sqltype):
"""A type to encode/decode JSON on the fly
sqltype is the string type for the underlying DB column.
You can use it like:
Column(JSONAlchemy(Text(600)))
"""
class _JSONEncodedObj(JSONEncodedObj):
impl = sqltype
return MutationObj.as_mutable(_JSONEncodedObj)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment