Skip to content

Instantly share code, notes, and snippets.

@saghul
Created May 26, 2011 21:13
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save saghul/994096 to your computer and use it in GitHub Desktop.
Serialize SQLite operations with SQLObject and Twisted
# coding=utf8
# Copyright (C) 2011 Saúl Ibarra Corretgé <saghul@gmail.com>
#
__all__ = ['Database', 'DatabaseError']
from threading import Thread
from sqlobject import connectionForURI, sqlhub, SQLObject, StringCol
from twisted.internet import reactor
from twisted.internet.threads import deferToThreadPool
from twisted.python.threadpool import ThreadPool
pool = ThreadPool(minthreads=1, maxthreads=1, name='db-ops')
pool.start()
reactor.addSystemEventTrigger('before', 'shutdown', pool.stop)
def run_in_db_thread(func):
"""Decorator to run DB queries in Twisted's thread pool"""
def wrapper(*args, **kw):
return deferToThreadPool(reactor, pool, func, *args, **kw)
return wrapper
class Users(SQLObject):
nickname = StringCol()
full_name = StringCol()
email = StringCol()
class DatabaseError(Exception): pass
class Database(object):
def __init__(self, dburi=None):
self._uri = dburi or 'sqlite:/:memory:'
self.initialize()
def _create_table(self, klass):
if klass.tableExists():
return
else:
print 'Table %s does not exists. Creating it now.' % klass.sqlmeta.table
saved = klass._connection.debug
try:
klass._connection.debug = True
klass.createTable()
finally:
klass._connection.debug = saved
@run_in_db_thread
def initialize(self):
try:
conn = connectionForURI(self._uri)
sqlhub.processConnection = conn
except Exception, e:
print 'Error connection with the DB: %s' % e
self.connected = False
return
else:
self.connected = True
for klass in [Users]: # We'd initialize all SQLObjects here
self._create_table(klass)
@run_in_db_thread
def create_user(self, nickname, fullname, email):
return Users(nickname=nickname, full_name=fullname, email=email)
@run_in_db_thread
def get_user_data(self, nickname):
try:
user = Users.selectBy(nickname=nickname)[0]
except IndexError:
raise DatabaseError("User %s doesn't exist" % nickname)
else:
return user
def main():
def got_result(user):
print 'User info:'
print '\tNickname: %s' % user.nickname
print '\tFull name: %s' % user.full_name
print '\tEmail address: %s' % user.email
def got_error(error):
print 'Got error! %s' % error.getErrorMessage()
db = Database()
db.create_user('saghul', 'saghul', 'saghul@gmail.com')
db.create_user('saghul2', 'saghul2', 'saghul@gmail.com')
db.create_user('saghu3', 'saghul3', 'saghul@gmail.com')
db.create_user('saghul4', 'saghul4', 'saghul@gmail.com')
d = db.get_user_data('saghul')
d.addCallbacks(got_result, got_error)
def run_reactor():
reactor.run(installSignalHandlers=False)
def reactor_stop():
from time import sleep
sleep(3)
reactor.callFromThread(reactor.stop)
if __name__ == '__main__':
Thread(target=run_reactor).start()
main()
reactor_stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment