Last active
May 31, 2021 09:37
-
-
Save silenius/77466fc260a9cb0f25025bffbf3339cd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from sqlalchemy import orm | |
from sqlalchemy import sql | |
from sqlalchemy import * | |
from sqlalchemy.orm import * | |
from sqlalchemy.orm.collections import attribute_mapped_collection | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.ext.associationproxy import association_proxy | |
from sqlalchemy.ext.hybrid import hybrid_property | |
import unittest | |
Base = declarative_base() | |
log = logging.getLogger(__name__) | |
# Fake stuff for testing | |
_fake_ids = { | |
'document': 1, | |
'event': 2, | |
'folder': 3 | |
} | |
def get_current_locale(): | |
return 'fr' | |
def get_default_locale(): | |
return 'en' | |
def setup_relationships(content_cls, translation_cls, | |
current_locale=get_current_locale, | |
default_locale=get_default_locale): | |
'''Helper to setup translations''' | |
log.info('Adding translation properties: %s to %s', content_cls, | |
translation_cls) | |
content_mapper = orm.class_mapper(content_cls) | |
translation_mapper = orm.class_mapper(translation_cls) | |
current_locale = sql.bindparam(None, callable_=current_locale, | |
type_=String()) | |
default_locale = sql.bindparam(None, callable_=default_locale, | |
type_=String()) | |
partition = sql.select([ | |
translation_cls, | |
sql.func.row_number().over( | |
order_by=[ | |
sql.desc(translation_cls.language_id == current_locale), | |
sql.desc(translation_cls.language_id == default_locale) | |
], | |
partition_by=translation_cls.content_id | |
).label('index') | |
], use_labels=True).where( | |
sql.and_( | |
translation_cls.language_id.in_((current_locale, default_locale)) | |
) | |
).alias() | |
partition_alias = orm.aliased(translation_cls, partition) | |
# First, add properties on the Content-like class | |
content_name = content_cls.__name__.lower() | |
content_current_translation = ( | |
'{}_current_translation'.format(content_name) | |
) | |
content_translations = ( | |
'{}_translations'.format(content_name) | |
) | |
is_base_mapper = content_mapper.base_mapper is content_mapper | |
content_mapper.add_properties({ | |
content_current_translation: orm.relationship( | |
partition_alias, | |
primaryjoin=sql.and_( | |
orm.foreign(partition_alias.content_id) == content_cls.id, | |
partition.c.index == 1, | |
), | |
lazy='noload' if is_base_mapper else 'joined', | |
uselist=False, | |
innerjoin=True, | |
viewonly=True, | |
bake_queries=False, | |
), | |
content_translations: orm.relationship( | |
lambda: translation_cls, | |
cascade='all, delete-orphan', | |
innerjoin=True, | |
lazy='noload' if is_base_mapper else 'select', | |
bake_queries=False, | |
backref=orm.backref( | |
content_name, | |
innerjoin=True, | |
uselist=False, | |
), | |
collection_class=attribute_mapped_collection('language_id') | |
) | |
}) | |
language_name = '{}_language'.format(content_name) | |
translation_mapper.add_properties({ | |
language_name: orm.relationship( | |
Language, | |
lazy='noload' if is_base_mapper else 'select', | |
innerjoin=True, | |
uselist=False, | |
backref=orm.backref( | |
content_translations, | |
cascade='all, delete-orphan' | |
) | |
) | |
}) | |
content_cls.translations = getattr(content_cls, content_translations) | |
content_cls.current_translation = getattr(content_cls, content_current_translation) | |
translation_cls.language = getattr(translation_cls, language_name) | |
def setup_hybrids(cls, name, translation_cls, | |
current_locale=get_current_locale, default=None): | |
def _fget(self): | |
return getattr(self.current_translation, name, default) | |
def _fset(self, value): | |
locale_name = current_locale() | |
trans = self.translations.setdefault( | |
locale_name, | |
translation_cls(language_id=locale_name) | |
) | |
setattr(trans, name, value) | |
def _expr(_cls): | |
return getattr(translation_cls, name) | |
log.info('Adding hybrid attribute: %s.%s', cls, name) | |
prop = hybrid_property(fget=_fget, fset=_fset, expr=_expr) | |
setattr(cls, name, prop) | |
############ | |
# ENTITIES # | |
############ | |
class ContentType(Base): | |
__tablename__ = 'content_type' | |
id = Column(Integer, primary_key=True) | |
name = Column(String(50), nullable=False) | |
def __init__(self, id, name): | |
self.id = id | |
self.name = name | |
class Language(Base): | |
__tablename__ = 'language' | |
id = Column(String(2), primary_key=True) | |
name = Column(String(50), nullable=False) | |
def __init__(self, id, name): | |
self.id = id | |
self.name = name | |
class Content(Base): | |
__tablename__ = 'content' | |
id = Column(Integer, primary_key=True) | |
content_type_id = Column(Integer, ForeignKey('content_type.id'), | |
nullable=False) | |
container_id = Column(Integer, ForeignKey('folder.content_id')) | |
parent = orm.relationship( | |
lambda: Folder, | |
foreign_keys=lambda: Content.container_id, | |
innerjoin=True, | |
uselist=False, | |
backref=orm.backref('children', cascade='all, delete-orphan') | |
) | |
content_type = orm.relationship( | |
ContentType, | |
lazy='joined', | |
innerjoin=True, | |
uselist=False | |
) | |
__mapper_args__ = { | |
'polymorphic_on': content_type_id | |
} | |
class Folder(Content): | |
__tablename__ = 'folder' | |
content_id = Column(Integer, ForeignKey('content.id'), primary_key=True) | |
some_folder_col = Column(Integer, nullable=True) | |
__mapper_args__ = { | |
'polymorphic_identity': _fake_ids['folder'], | |
'inherit_condition': content_id == Content.id | |
} | |
def __init__(self, title, description=None, some_folder_col=None): | |
self.some_folder_col = some_folder_col | |
locale = get_current_locale() | |
self.translations[locale] = FolderTranslation( | |
title=title, content_type_id=_fake_ids['folder'], | |
description=description | |
) | |
class Document(Content): | |
__tablename__ = 'document' | |
content_id = Column(Integer, ForeignKey('content.id'), primary_key=True) | |
some_doc_col = Column(String, nullable=True) | |
__mapper_args__ = { | |
'polymorphic_identity': _fake_ids['document'], | |
} | |
def __init__(self, title, body, description=None, locale=None): | |
if locale is None: | |
locale = get_current_locale() | |
self.translations[locale] = DocumentTranslation( | |
body=body, title=title, content_type_id=_fake_ids['document'], | |
description=description, language_id=locale | |
) | |
####################### | |
# TRANSLATION CLASSES # | |
####################### | |
class ContentTranslation(Base): | |
__tablename__ = 'content_translation' | |
language_id = Column(String(2), ForeignKey('language.id'), | |
primary_key=True) | |
content_id = Column(Integer, ForeignKey('content.id'), primary_key=True) | |
title = Column(String, nullable=False) | |
description = Column(String) | |
content_type = association_proxy("content", "content_type") | |
__mapper_args__ = { | |
'polymorphic_on': | |
select([Content.content_type_id]).where(content_id == Content.id).\ | |
correlate_except(Content).as_scalar() | |
} | |
def __init__(self, **kwargs): | |
if 'language_id' not in kwargs: | |
self.language_id = get_current_locale() | |
for k, v in kwargs.items(): | |
setattr(self, k, v) | |
class FolderTranslation(ContentTranslation): | |
__tablename__ = 'folder_translation' | |
language_id = Column(String(2), ForeignKey('language.id'), | |
primary_key=True) | |
content_id = Column(Integer, ForeignKey('folder.content_id'), primary_key=True) | |
__table_args__ = ( | |
ForeignKeyConstraint( | |
['language_id', 'content_id'], | |
['content_translation.language_id', | |
'content_translation.content_id'] | |
), | |
) | |
__mapper_args__ = { | |
'polymorphic_identity': _fake_ids['folder'], | |
} | |
class DocumentTranslation(ContentTranslation): | |
__tablename__ = 'document_translation' | |
language_id = Column(String(2), ForeignKey('language.id'), | |
primary_key=True) | |
content_id = Column(Integer, ForeignKey('document.content_id'), | |
primary_key=True) | |
body = Column(String, nullable=False) | |
__table_args__ = ( | |
ForeignKeyConstraint( | |
['language_id', 'content_id'], | |
['content_translation.language_id', | |
'content_translation.content_id'] | |
), | |
) | |
__mapper_args__ = { | |
'polymorphic_identity': _fake_ids['document'] | |
} | |
e = create_engine("sqlite://", echo=False) | |
Base.metadata.create_all(e) | |
setup_relationships(Content, ContentTranslation) | |
setup_relationships(Folder, FolderTranslation) | |
setup_relationships(Document, DocumentTranslation) | |
for col in ('title', 'description'): | |
setup_hybrids(Content, col, ContentTranslation) | |
for col in ('title', 'description'): | |
setup_hybrids(Folder, col, FolderTranslation) | |
for col in ('title', 'description', 'body'): | |
setup_hybrids(Document, col, DocumentTranslation) | |
class FooTest(unittest.TestCase): | |
@classmethod | |
def setup_class(cls): | |
s = Session(e) | |
for name, fake_id in _fake_ids.items(): | |
s.add(ContentType(fake_id, name)) | |
s.add(Language('fr', 'Français')) | |
s.add(Language('en', 'English')) | |
root = Folder('root') | |
s.add(root) | |
subroot = Folder('subroot') | |
root.children.append(subroot) | |
doc1 = Document('doc1_title_fr', 'doc1_body_fr') | |
doc2 = Document('doc2_title_fr', 'doc2_body_fr') | |
doc3 = Document('doc3_title_en', 'doc3_body_en', locale='en') | |
subroot.children.append(doc1) | |
subroot.children.append(doc2) | |
subroot.children.append(doc3) | |
trans_fr = DocumentTranslation( | |
title='doc3_title_fr', | |
description='doc3_description_fr', | |
body='doc3_body_fr' | |
) | |
doc3.translations['fr'] = trans_fr | |
s.flush() | |
s.commit() | |
cls.doc3_id = doc3.id | |
s.close() | |
def test_one(self): | |
s = Session(e) | |
foo = s.query(Document).get(self.doc3_id) | |
assert foo.translations['fr'].title == 'doc3_title_fr' | |
assert foo.translations['en'].title == 'doc3_title_en' | |
assert foo.translations['fr'].body == 'doc3_body_fr' | |
assert foo.translations['en'].body == 'doc3_body_en' | |
assert foo.description == 'doc3_description_fr' | |
s.close() | |
def test_two(self): | |
s = Session(e) | |
q = s.query(Document).join(Document.translations).filter(Document.title=='doc1_title_fr') | |
foo = q.one() | |
assert foo.title == 'doc1_title_fr' | |
s.close() | |
def test_three(self): | |
s = Session(e) | |
q = s.query(Document).join(DocumentTranslation).filter(Document.title.like('doc3%')).order_by(Document.title) | |
foo = q.all() | |
assert len(foo) == 1 | |
assert foo[0].title == 'doc3_title_fr' | |
assert foo[0].translations['en'].title == 'doc3_title_en' | |
assert foo[0].translations['fr'].title == 'doc3_title_fr' | |
s.close() | |
def test_four(self): | |
s = Session(e) | |
foo = s.query(Document).filter(Document.translations.any(language_id='fr')).all() | |
assert len(foo) == 3 | |
s.close() | |
def test_five(self): | |
s = Session(e) | |
foo = DocumentTranslation( | |
title='doc2_title_nl', body='doc2_body_nl', language_id='nl' | |
) | |
doc2 = s.query(Document).filter(Document.translations.any(title='doc2_title_fr')).one() | |
assert doc2.title == 'doc2_title_fr' | |
doc2.translations['nl'] = foo | |
s.close() | |
def test_six(self): | |
s = Session(e) | |
q = s.query(Document).join(Document.current_translation).options( | |
orm.contains_eager(Document.current_translation) | |
).filter(Document.title.like('doc3%')).order_by(Document.title) | |
print(q) | |
foo = q.all() | |
assert len(foo) == 1 | |
assert foo[0].title == 'doc3_title_fr' | |
assert foo[0].translations['en'].title == 'doc3_title_en' | |
assert foo[0].translations['fr'].title == 'doc3_title_fr' | |
s.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment