Skip to content

Instantly share code, notes, and snippets.

@FSX
Created October 5, 2012 11:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save FSX/3839266 to your computer and use it in GitHub Desktop.
Save FSX/3839266 to your computer and use it in GitHub Desktop.
Testing asynchronous Psycopg2.
import sys
from functools import partial
import psycopg2
from psycopg2 import OperationalError
from psycopg2.extensions import (POLL_OK, POLL_READ, POLL_WRITE, POLL_ERROR,
connection as base_connection, cursor as base_cursor)
from tornado.ioloop import IOLoop
io_poll = {
IOLoop.NONE: 'IOLoop.NONE',
IOLoop.READ: 'IOLoop.READ',
IOLoop.WRITE: 'IOLoop.WRITE',
IOLoop.ERROR: 'IOLoop.ERROR'
}
psy_poll = {
POLL_OK: 'POLL_OK',
POLL_READ: 'POLL_READ',
POLL_WRITE: 'POLL_WRITE',
POLL_ERROR: 'POLL_ERROR'
}
state_map = {
# POLL_OK: IOLoop.NONE,
POLL_READ: IOLoop.READ,
POLL_WRITE: IOLoop.WRITE,
POLL_ERROR: IOLoop.ERROR
}
def _dummy_callback(cursor):
pass
class Connection:
def __init__(self, dsn, connection_factory=base_connection, callback=None):
self.connection = psycopg2.connect(dsn, async=1,
connection_factory=connection_factory)
self.ioloop = IOLoop.instance()
self.callback = partial(callback, self)
self.counter = 0
state = self.connection.poll()
self.ioloop.add_handler(self.connection.fileno(), self.io_callback,
IOLoop.ERROR | state_map.get(state, IOLoop.ERROR))
def io_callback(self, fd, events):
self.counter += 1
print(self.counter)
state = self.connection.poll()
self.ioloop.update_handler(fd, state_map.get(state, 0))
if state == POLL_OK:
if self.callback is not None:
self.callback()
elif state not in (POLL_OK, POLL_READ, POLL_WRITE):
raise OperationalError('poll() returned {0}'.format(state))
def execute(self, operation, parameters=(), cursor_factory=base_cursor,
callback=_dummy_callback):
cursor = self.connection.cursor(cursor_factory=cursor_factory)
cursor.execute(operation, parameters)
# Set callback and connection state
self.callback = partial(callback, cursor)
state = self.connection.poll()
self.ioloop.update_handler(self.connection.fileno(),
state_map.get(state, IOLoop.ERROR))
def test_conn_callback(connection):
print('Test conn: ', connection)
connection.execute('SELECT 1;', callback=test_curs_callback)
def test_curs_callback(cursor):
print('Test curs: ', cursor)
print(cursor.fetchall())
if __name__ == '__main__':
dsn = 'dbname=momoko_db user=momoko_user password=password host=localhost port=5432'
Connection(dsn, callback=test_conn_callback)
IOLoop.instance().start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment