Skip to content

Instantly share code, notes, and snippets.

@Diggsey
Created February 19, 2016 15:32
Show Gist options
  • Save Diggsey/02b7ba4cefc00a2c7fde to your computer and use it in GitHub Desktop.
Save Diggsey/02b7ba4cefc00a2c7fde to your computer and use it in GitHub Desktop.
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta
class OwningContext(object):
__slots__ = ('instance', 'prefix')
def __init__(self, instance, prefix):
self.instance = instance
self.prefix = prefix
def field_name(self, key):
return self.prefix + key
def set_(self, key, value):
setattr(self.instance, self.field_name(key), value)
def get_(self, key):
return getattr(self.instance, self.field_name(key))
def del_(self, key):
delattr(self.instance, self.field_name(key))
class CompositeColumn(object):
def __init__(self, embedded, separator = '_'):
self.embedded = embedded
self.separator = separator
def get_prefix(self, key):
return key + self.separator
class EmbeddedAttribute(object):
def __init__(self, owner, key, column):
self._owner = owner
self._key = key
self._column = column
if not isinstance(owner, EmbeddedMeta):
EmbeddedAttribute.annotate_obj(self, column.embedded._fields)
def _resolve(self, k):
suffix = self._column.get_prefix(self._key) + k
if isinstance(self._owner, EmbeddedAttribute):
return self._owner._resolve(suffix)
else:
return getattr(self._owner, suffix)
def __get__(self, instance, owner=None):
if instance is None:
return self
if self._key not in instance.__dict__:
owning_context = OwningContext(instance, self._column.get_prefix(self._key))
instance.__dict__[self._key] = self._column.embedded(_owning_context=owning_context)
return instance.__dict__[self._key]
def __set__(self, instance, value):
assignee = self.__get__(instance)
for f in self._column.embedded._fields:
setattr(assignee, f, getattr(value, f))
def __getattr__(self, key):
return self._resolve(key)
@staticmethod
def annotate_obj(obj, dict_):
for k, v in dict_.iteritems():
if isinstance(v, CompositeColumn):
setattr(obj, k, EmbeddedAttribute(obj, k, v))
class EmbeddedMeta(type):
def __new__(mcs, classname, bases, dict_):
def is_field(v):
return isinstance(v, sqlalchemy.Column) or isinstance(v, CompositeColumn)
fields = {k: v for k, v in dict_.iteritems() if is_field(v)}
for k, v in fields.iteritems():
del dict_[k]
cls = type.__new__(mcs, classname, bases, dict_)
cls._fields = fields
cls._flat_fields = {}
cls._flatten(cls._flat_fields)
EmbeddedAttribute.annotate_obj(cls, fields)
return cls
class EmbeddedBase(object):
__metaclass__ = EmbeddedMeta
def __init__(self, _owning_context=None, **kwargs):
self.__dict__['_owning_context'] = _owning_context
object.__init__(self)
for k, v in kwargs.iteritems():
setattr(self, k, v)
def __setattr__(self, key, value):
descriptor = getattr(type(self), key, None)
if hasattr(descriptor, '__set__'):
descriptor.__set__(self, value)
elif self._owning_context is not None:
self._owning_context.set_(key, value)
elif key in self._flat_fields:
object.__setattr__(self, key, value)
else:
raise AttributeError(key)
def __getattr__(self, key):
if self._owning_context is not None:
return self._owning_context.get_(key)
elif key in self._flat_fields:
return None
else:
raise AttributeError(key)
def __delattr__(self, key):
if self._owning_context is None:
object.__delattr__(self, key)
else:
self._owning_context.del_(key)
@classmethod
def __dir__(cls):
return cls._fields.keys()
@classmethod
def _flatten(cls, dict_, prefix = ''):
for k, v in cls._fields.iteritems():
field = prefix + k
if isinstance(v, CompositeColumn):
v.embedded._flatten(dict_, v.get_prefix(field))
else:
dict_[field] = v.copy()
class CustomMeta(DeclarativeMeta):
def __new__(mcs, classname, bases, dict_):
composites = {k: v for k, v in dict_.iteritems() if isinstance(v, CompositeColumn)}
for k, v in composites.iteritems():
v = dict_.pop(k)
v.embedded._flatten(dict_, v.get_prefix(k))
cls = DeclarativeMeta.__new__(mcs, classname, bases, dict_)
EmbeddedAttribute.annotate_obj(cls, composites)
return cls
Base = declarative_base(metaclass=CustomMeta)
class EmbeddedName(EmbeddedBase):
first = sqlalchemy.Column(sqlalchemy.String)
last = sqlalchemy.Column(sqlalchemy.String)
class EmbeddedPerson(EmbeddedBase):
name = CompositeColumn(EmbeddedName, separator='-')
age = sqlalchemy.Column(sqlalchemy.Integer)
class Test(Base):
__tablename__ = 'test'
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
person_a = CompositeColumn(EmbeddedPerson)
person_b = CompositeColumn(EmbeddedPerson)
x = Test()
y = EmbeddedPerson(age=2)
y.name = EmbeddedName(first="hello")
x.person_a = y
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment