Last active
November 8, 2016 13:05
-
-
Save absent1706/13a5bc5ec88593d0aab51782781a189c to your computer and use it in GitHub Desktop.
DbSession + get updated/deleted, i.e. old values of some ORM attribute, say, KnowMix.attachment_id
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sqlalchemy.orm import sessionmaker, Session | |
class DbSession(Session): | |
def get_old(self, attr): | |
''' gets all old(updated/deleted) values of attribute ''' | |
parent_class = attr.parent.class_ | |
# take all attr values from deleted entities | |
# and all changed attr values from dirty entities | |
result = {obj: getattr(obj, attr.key) for obj in self.deleted | |
if isinstance(obj, parent_class)} | |
result.update( | |
{obj: sa.orm.attributes.get_history(obj, attr.key).deleted[0] | |
for obj in self.dirty | |
if isinstance(obj, parent_class) and | |
sa.orm.attributes.get_history(obj, attr.key).deleted}) | |
return result | |
def get_new(self, attr): | |
''' gets all new values of attribute ''' | |
parent_class = attr.parent.class_ | |
# take all attr values from new entities | |
# and all added attr values from dirty entities | |
result = {obj: getattr(obj, attr.key) for obj in self.new | |
if isinstance(obj, parent_class)} | |
result.update( | |
{obj: sa.orm.attributes.get_history(obj, attr.key).added[0] | |
for obj in self.dirty | |
if isinstance(obj, parent_class) and | |
sa.orm.attributes.get_history(obj, attr.key).added}) | |
return result | |
#### | |
session = sessionmaker(class_=DbSession) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# TODO: move this to session class | |
def get_changed_attribute_values(session, attr): | |
''' | |
gets all changed(updated/deleted) values of attribute | |
code idea is taken from | |
http://sqlalchemy-utils.readthedocs.io/en/latest/_modules/sqlalchemy_utils/listeners.html | |
Example: | |
if attr=KnowMix.final_exam_attachment_id | |
function will return: | |
- all final_exam_attachment_id's of deleted KnowMix'es | |
- all updated final_exam_attachment_id's of updated KnowMix'es | |
''' | |
parent_class = attr.parent.class_ | |
# take all attr values from deleted entities | |
# and all changed attr values from dirty entities | |
return [getattr(obj, attr.key) for obj in session.deleted | |
if isinstance(obj, parent_class)] + \ | |
[sa.orm.attributes.get_history(obj, attr.key).deleted[0] | |
for obj in session.dirty | |
if isinstance(obj, parent_class) and | |
sa.orm.attributes.get_history(obj, attr.key).deleted] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlalchemy as sa | |
from server.models import KnowMix, KnowMixStep | |
from server.processors import get_stored_file | |
__all__ = ['init_stored_files_listeners'] | |
# TODO: use this stuff in all places where stored file is deleted (school, product images etc.) | |
def get_changed_attribute_values(session, attr): | |
''' | |
gets all changed(updated/deleted) values of attribute | |
code idea is taken from | |
http://sqlalchemy-utils.readthedocs.io/en/latest/_modules/sqlalchemy_utils/listeners.html | |
Example: | |
if attr=KnowMix.final_exam_attachment_id | |
function will return: | |
- all final_exam_attachment_id's of deleted KnowMix'es | |
- all updated final_exam_attachment_id's of updated KnowMix'es | |
''' | |
parent_class = attr.parent.class_ | |
# take all attr values from deleted entities | |
# and all changed attr values from dirty entities | |
return [getattr(obj, attr.key) for obj in session.deleted | |
if isinstance(obj, parent_class)] + \ | |
[sa.orm.attributes.get_history(obj, attr.key).deleted[0] | |
for obj in session.dirty | |
if isinstance(obj, parent_class) and | |
sa.orm.attributes.get_history(obj, attr.key).deleted] | |
def autoremove_stored_file_id(attr, sess=sa.orm.Session): | |
''' mark StoredFile as unused (for id attributes) ''' | |
@sa.event.listens_for(sess, 'before_flush') | |
def listener(session, _, __): | |
file_ids = get_changed_attribute_values(session, attr) | |
file_ids = [f for f in file_ids if f] # remove None's | |
for file_id in file_ids: | |
get_stored_file(session, file_id).is_unused = True | |
def autoremove_stored_file_relationship(attr, sess=sa.orm.Session): | |
''' mark StoredFile as unused (for relationships) ''' | |
@sa.event.listens_for(sess, 'before_flush') | |
def listener(session, _, __): | |
files = get_changed_attribute_values(session, attr) | |
files = [f for f in files if f] # remove None's | |
for file in files: | |
file.is_unused = True | |
# handy shortcut for initializing 2 above listeners | |
def autoremove_stored_file(id_attr, relationship_attr): | |
autoremove_stored_file_id(id_attr) | |
autoremove_stored_file_relationship(relationship_attr) | |
def init_stored_files_listeners(): | |
# here are listeners for all stored_file attributes | |
autoremove_stored_file(KnowMix.final_exam_attachment_id, | |
KnowMix.final_exam_attachment) | |
autoremove_stored_file(KnowMixStep.quiz_attachment_id, | |
KnowMixStep.quiz_attachment) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
import server | |
from server.testing_tools import BaseUnitTest | |
from server.models import Base, Column, Integer, String, ForeignKey, \ | |
relationship, StoredFile | |
from server.testing_tools.generate_test_data import save_one, \ | |
generate_stored_file | |
from server.events.listeners.stored_files import autoremove_stored_file_id, \ | |
autoremove_stored_file_relationship | |
class FakeEntity(Base): | |
__tablename__ = 'fake_entity' | |
id = Column(Integer, primary_key=True) | |
stored_file_id = Column(String(200), | |
ForeignKey('stored_file.stored_file_id')) | |
stored_file = relationship('StoredFile') | |
pass | |
class TestAutoremoveStoredFile(BaseUnitTest): | |
def setUp(self, *args, **kwargs): | |
# drop/create fake table | |
engine = server.settings.get_engine() | |
try: | |
FakeEntity.__table__.drop(engine) | |
except: | |
pass | |
FakeEntity.__table__.create(engine) | |
super(TestAutoremoveStoredFile, self).setUp(*args, **kwargs) | |
class TestAutoremoveStoredFileId(TestAutoremoveStoredFile): | |
def test_negative(self): | |
''' check file is NOT automatically deleted if no listener inited ''' | |
# create initial entity and stored file | |
file1 = generate_stored_file(self.db)[0] | |
file2 = generate_stored_file(self.db)[0] | |
entity = save_one(self.db, FakeEntity()) | |
self.db.flush() | |
# assign stored file | |
entity.stored_file_id = file1.stored_file_id | |
self.db.flush() | |
# replace stored file | |
entity.stored_file_id = file2.stored_file_id | |
self.db.flush() | |
# check that it's NOT marked as is_removed | |
file = self.db.query(StoredFile).get(file1.stored_file_id) | |
self.assertFalse(file.is_unused) | |
def test_positive(self): | |
''' check file IS automatically deleted after listener init ''' | |
autoremove_stored_file_id(FakeEntity.stored_file_id, self.db.session) | |
# create initial entity and stored file | |
file1 = generate_stored_file(self.db)[0] | |
file2 = generate_stored_file(self.db)[0] | |
entity = save_one(self.db, FakeEntity()) | |
self.db.flush() | |
# assign stored file | |
entity.stored_file_id = file1.stored_file_id | |
self.db.flush() | |
# replace stored file | |
entity.stored_file_id = file2.stored_file_id | |
self.db.flush() | |
# check that it's marked as is_removed | |
file = self.db.query(StoredFile).get(file1.stored_file_id) | |
self.assertTrue(file.is_unused) | |
class TestAutoremoveStoredFileRelationship(TestAutoremoveStoredFile): | |
def test_negative(self): | |
''' check file is NOT automatically deleted if no listener inited ''' | |
# create initial entity and stored file | |
file1 = generate_stored_file(self.db)[0] | |
file2 = generate_stored_file(self.db)[0] | |
entity = save_one(self.db, FakeEntity()) | |
self.db.flush() | |
# assign stored file | |
entity.stored_file = file1 | |
self.db.flush() | |
# replace stored file | |
entity.stored_file = file2 | |
self.db.flush() | |
# check that it's NOT marked as is_removed | |
file = self.db.query(StoredFile).get(file1.stored_file_id) | |
self.assertFalse(file.is_unused) | |
def test_positive(self): | |
''' check file IS automatically deleted after listener init ''' | |
autoremove_stored_file_relationship(FakeEntity.stored_file, | |
self.db.session) | |
# create initial entity and stored file | |
file1 = generate_stored_file(self.db)[0] | |
file2 = generate_stored_file(self.db)[0] | |
entity = save_one(self.db, FakeEntity()) | |
self.db.flush() | |
# assign stored file | |
entity.stored_file = file1 | |
self.db.flush() | |
# replace stored file | |
entity.stored_file = file2 | |
self.db.flush() | |
# check that it's marked as is_removed | |
file = self.db.query(StoredFile).get(file1.stored_file_id) | |
self.assertTrue(file.is_unused) | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment