Created
April 15, 2009 04:46
-
-
Save storborg/95606 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
query for a database obect which relates to a specific set of other objects. | |
""" | |
__author__ = 'scott torborg (scotttorborg.com)' | |
from sqlalchemy import * | |
from sqlalchemy.orm import * | |
from sqlalchemy.ext.declarative import declarative_base | |
metadata = MetaData('sqlite://') | |
metadata.bind.echo = True | |
Base = declarative_base(metadata=metadata) | |
assocs_table = Table( | |
'assocs', metadata, | |
Column('foo_id', None, ForeignKey('foos.id')), | |
Column('bar_id', None, ForeignKey('bars.id')) | |
) | |
class Foo(Base): | |
__tablename__ = 'foos' | |
id = Column(Integer(unsigned=True), primary_key=True) | |
text = Column(String(20)) | |
def __init__(self, text): | |
self.text = text | |
def __repr__(self): | |
return "<Foo: id %d, text %s>" % (self.id, self.text) | |
bars = relation('Bar', secondary=assocs_table, backref='foos') | |
class Bar(Base): | |
__tablename__ = 'bars' | |
id = Column(Integer(unsigned=True), primary_key=True) | |
data = Column(Integer) | |
def __init__(self, data): | |
self.data = data | |
def __repr__(self): | |
return "<Bar: id %d, data %d>" % (self.id, self.data) | |
###### | |
# use it ! | |
metadata.create_all() | |
sess = create_session() | |
foo1 = Foo('the') | |
foo2 = Foo('quick') | |
foo3 = Foo('brown') | |
foo4 = Foo('fox') | |
bar1 = Bar(1) | |
bar2 = Bar(2) | |
bar3 = Bar(3) | |
bar4 = Bar(4) | |
bar5 = Bar(5) | |
foo1.bars.append(bar1) | |
foo1.bars.append(bar2) | |
foo1.bars.append(bar3) | |
foo2.bars.append(bar1) | |
foo2.bars.append(bar2) | |
foo2.bars.append(bar4) | |
foo3.bars.append(bar5) | |
foo3.bars.append(bar2) | |
foo3.bars.append(bar4) | |
for f in [foo1, foo2, foo3, foo4]: | |
sess.add(f) | |
for b in [bar1, bar2, bar3, bar4, bar5]: | |
sess.add(b) | |
sess.flush() | |
sess.clear() | |
# query objects, get their addresses | |
print "all foos" | |
for f in sess.query(Foo).all(): | |
print f | |
print "all bars" | |
for b in sess.query(Bar).all(): | |
print b | |
desired = sess.query(Foo).get(2) | |
print "looking for just the 1,2,4 set, which should be foo2" | |
look_for = [bar1, bar2, bar4] | |
q = sess.query(Foo) | |
for lf in look_for: | |
q = q.filter(Foo.bars.contains(lf)) | |
assert desired == q.all()[0] | |
print "try and query with just ids instead of objects" | |
look_for = [1, 2, 4] | |
q = sess.query(Foo) | |
for lf in look_for: | |
q = q.filter(Foo.bars.any(id=lf)) | |
assert desired == q.all()[0] | |
print "try and make the query from scratch by aliasing Bar" | |
# build a set of aliases, one for each element we're looking for | |
aliases = [aliased(Bar) for _ in look_for] | |
q = sess.query(Foo).join(*[(a, Foo.bars) for a in aliases]) | |
for i, lf in enumerate(look_for): | |
q = q.filter(aliases[i].id == lf) | |
assert desired == q.all()[0] | |
print "do it without pre-generating aliases" | |
q = sess.query(Foo) | |
for lf in look_for: | |
a = aliased(Bar) | |
q = q.join((a, Foo.bars)).filter(a.id == lf) | |
assert desired == q.all()[0] | |
print "try and make the query from scratch without the extra Bar join" | |
aliases = [assocs_table.alias() for _ in look_for] | |
q = sess.query(Foo).join(*aliases) | |
for a, lf in zip(aliases, look_for): | |
q = q.filter(a.c.bar_id == lf) | |
assert desired == q.all()[0] | |
print "do it without pre-generating aliases" | |
q = sess.query(Foo) | |
for lf in look_for: | |
a = assocs_table.alias() | |
q = q.join(a).filter(a.c.bar_id == lf) | |
assert desired == q.all()[0] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment