Created
August 11, 2009 22:35
-
-
Save magcius/166165 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlamp | |
import config | |
import sqlalchemy | |
# ===================================================== # | |
meta = sqlalchemy.MetaData() | |
session = sqlalchemy.orm.scoped_session(sqlalchemy.orm.sessionmaker()) | |
def _get_node(node): | |
""" | |
Go to our linked node if we're at a leaf. | |
This means that if we're at "a" "b" "c" "d" | |
and node is "d", node.link should be the "d" in | |
"b" "c" "d", which is not a leaf and should have children | |
""" | |
if node.is_leaf: | |
node = node.link | |
if node is None: | |
raise SentenceTerminated | |
return node | |
def walk_down(node, symbol): | |
return _get_node(node)[symbol] | |
def walk_down_many(node, symbols): | |
node = _get_node(node) | |
for sym in symbols: | |
node = _get_node(node[sym]) | |
return node | |
def walk_up(self, node): | |
parent = node.parent | |
if parent.is_root: | |
return walk_up(node.link) | |
def get_root(): | |
return session.query(RootNode).one() | |
def known_items(): | |
return session.query(LeafNode).count() | |
def symbol_exists(symbol): | |
return session.query(Node) \ | |
.filter_by(Node.symbol == symbol) \ | |
.filter_by( \ | |
(Node.parent.type == "rootnode") | \ | |
(Node.type == "leafnode") \ | |
).count() > 0 | |
def find_backward_start(keyword): | |
return session.query(LeafNode) \ | |
.filter_by(LeafNode.symbol == keyword) | |
# ===================================================== # | |
node_table = sqlalchemy.Table('node', meta, | |
sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True), | |
sqlalchemy.Column('type', sqlalchemy.String, nullable=False), | |
sqlalchemy.Column('symbol', sqlalchemy.String, nullable=False), | |
sqlalchemy.Column('count', sqlalchemy.Integer, nullable=False), | |
sqlalchemy.Column('parent_id', sqlalchemy.ForeignKey('node.id')), | |
) | |
leafnode_table = sqlalchemy.Table('leafnode', meta, | |
sqlalchemy.Column('node_id', sqlalchemy.Integer, sqlalchemy.ForeignKey('node.id'), primary_key=True), | |
sqlalchemy.Column('link_id', sqlalchemy.ForeignKey('node.id'), nullable=True), | |
) | |
# ===================================================== # | |
class Node(object): | |
mp = sqlamp.MPManager(node_table, node_table.c.id, node_table.c.parent_id) | |
is_root = False | |
is_leaf = False | |
def __init__(self, symbol, count=1, parent=None): | |
self.symbol = symbol | |
self.count = count | |
self.parent = parent | |
def __repr__(self): | |
return "<%s [%d] %r>" % (self.__class__.__name__, self.count, self.symbol) | |
class LeafNode(Node): | |
is_leaf = True | |
def __init__(self, symbol, count=1, parent=None, link=None): | |
super(LeafNode, self).__init__(symbol, count, parent) | |
self.link = link | |
def __repr__(self): | |
return "<%s [%d] %r linked to %r>" % (self.__class__.__name__, self.count, self.symbol, self.link) | |
class ParentNode(Node): | |
def __init__(self, symbol, count=1, parent=None): | |
super(ParentNode, self).__init__(symbol, count, parent) | |
def __contains__(self, symbol): | |
if symbol in self.children: | |
return True | |
return len([child for child in self.children if child.symbol == symbol]) > 0 | |
def __getitem__(self, symbol): | |
try: | |
return self.children[symbol] # allow node[0] for the first node | |
except TypeError: | |
return [child for child in self.children if child.symbol == symbol][0] | |
def __setitem__(self, node, name): | |
assert isinstance(symbol, Node) | |
if isinstance(name, basestring): | |
node.name = name | |
self.children.append(node) | |
else: | |
self.children[name] = nod | |
class RootNode(ParentNode): | |
is_root = True | |
class SentenceTerminated(Exception): | |
pass | |
# ===================================================== # | |
node_mapper = sqlalchemy.orm.mapper( | |
Node, node_table, | |
extension=[Node.mp.mapper_extension], | |
polymorphic_on=node_table.c.type, | |
polymorphic_identity='node', | |
properties=dict( | |
parent=sqlalchemy.orm.relation(Node, | |
remote_side=[node_table.c.id], | |
backref=sqlalchemy.orm.backref('children', cascade='all, delete'), | |
cascade='all, delete') | |
) | |
) | |
leafnode_mapper = sqlalchemy.orm.mapper( | |
LeafNode, leafnode_table, | |
inherits=Node, | |
polymorphic_identity='leafnode', | |
inherit_condition=node_table.c.id==leafnode_table.c.node_id, | |
properties=dict( | |
link=sqlalchemy.orm.relation(Node, backref='backlink', primaryjoin=node_table.c.id==leafnode_table.c.link_id) | |
) | |
) | |
parentnode_mapper = sqlalchemy.orm.mapper(ParentNode, inherits=Node, polymorphic_identity='parentnode') | |
rootnode_mapper = sqlalchemy.orm.mapper(RootNode, inherits=ParentNode, polymorphic_identity='rootnode') | |
# ===================================================== # | |
def create_tables(): | |
meta.create_all() | |
root = RootNode("") | |
session.add(root) | |
session.commit() | |
def init_storage(conn_str): | |
engine = sqlalchemy.create_engine(conn_str) | |
engine.echo = True | |
session.configure(bind=engine) | |
meta.engine = meta.bind = engine | |
if not engine.has_table(node_table.name): | |
create_tables() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment