Last active
December 11, 2015 19:49
-
-
Save eseifert/4651239 to your computer and use it in GitHub Desktop.
Implementation of a basic SQL backend for Cork based on SQLAlchemy. Requires Cork 0.6 and the commit for configurable backends (commit c48e163bdef42ad3c1ec098f5ebb5444dca48f4b).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import .base_backend | |
try: | |
from sqlalchemy import create_engine, delete, select, \ | |
Column, ForeignKey, Integer, MetaData, String, Table | |
sqlalchemy_available = True | |
except ImportError: | |
sqlalchemy_available = False | |
class SqlRowProxy(dict): | |
def __init__(self, sql_dict, key, *args, **kwargs): | |
dict.__init__(self, *args, **kwargs) | |
self.sql_dict = sql_dict | |
self.key = key | |
def __setitem__(self, key, value): | |
dict.__setitem__(self, key, value) | |
if self.sql_dict is not None: | |
self.sql_dict[self.key] = {key: value} | |
class SqlTable(base_backend.Table): | |
"""Provides dictionary-like access to an SQL table.""" | |
def __init__(self, engine, table, key_col_name): | |
self._engine = engine | |
self._table = table | |
self._key_col = table.c[key_col_name] | |
def _row_to_value(self, row): | |
row_key = row[self._key_col] | |
row_value = SqlRowProxy(self, row_key, | |
((k, row[k]) for k in row.keys() if k != self._key_col.name)) | |
return row_key, row_value | |
def __len__(self): | |
items = list(self.iteritems()) | |
return len(items) | |
def __contains__(self, key): | |
query = select([self._key_col], self._key_col == key) | |
row = self._engine.execute(query).fetchone() | |
return row is not None | |
def __setitem__(self, key, value): | |
if key in self: | |
values = value | |
query = self._table.update().where(self._key_col == key) | |
else: | |
values = {self._key_col.name: key} | |
values.update(value) | |
query = self._table.insert() | |
self._engine.execute(query.values(**values)) | |
def __getitem__(self, key): | |
query = select([self._table], self._key_col == key) | |
row = self._engine.execute(query).fetchone() | |
if row is None: | |
raise KeyError(key) | |
return self._row_to_value(row)[1] | |
def __iter__(self): | |
for key, value in self.iteritems(): | |
yield key | |
def iteritems(self): | |
query = select([self._key_col]) | |
result = self._engine.execute(query) | |
for row in result.fetchall(): | |
yield self._row_to_value(row) | |
def pop(self, key): | |
query = delete(self._table, self._key_col == key) | |
self._engine.execute(query) | |
class SqlSingleValueTable(SqlTable): | |
def __init__(self, engine, table, key_col_name, col_name): | |
SqlTable.__init__(self, engine, table, key_col_name) | |
self._col_name = col_name | |
def _row_to_value(self, row): | |
return row[self._key_col], row[self._col_name] | |
def __setitem__(self, key, value): | |
SqlTable.__setitem__(self, key, {self._col_name: value}) | |
class SqlAlchemyBackend(base_backend.Backend): | |
def __init__(self, db_url, users_tname='users', roles_tname='roles', | |
pending_reg_tname='register', initialize=False): | |
self._engine = create_engine(db_url) | |
self._metadata = MetaData() | |
self._users = Table(users_tname, self._metadata, | |
Column('username', String, primary_key=True), | |
Column('role', ForeignKey('roles.role')), | |
Column('hash', String, nullable=False), | |
Column('email_addr', String), | |
Column('desc', String), | |
Column('creation_date', String, nullable=False) | |
) | |
self._roles = Table(roles_tname, self._metadata, | |
Column('role', String, primary_key=True), | |
Column('level', Integer, nullable=False) | |
) | |
self._pending_reg = Table(pending_reg_tname, self._metadata, | |
Column('code', String, primary_key=True), | |
Column('username', String, nullable=False), | |
Column('role', ForeignKey('roles.role')), | |
Column('hash', String, nullable=False), | |
Column('email_addr', String), | |
Column('desc', String), | |
Column('creation_date', String, nullable=False) | |
) | |
if initialize: | |
self._initialize_storage() | |
self.users = SqlTable(self._engine, self._users, 'username') | |
self.roles = SqlSingleValueTable(self._engine, self._roles, 'role', 'level') | |
self.pending_registrations = SqlTable(self._engine, self._pending_reg, 'code') | |
def _initialize_storage(self): | |
self._metadata.create_all(self._engine) | |
def save_users(self): pass | |
def save_roles(self): pass | |
def save_pending_registrations(self): pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment