public
Created

Testing asynchronous Psycopg2.

  • Download Gist
async_psycopg2_test.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
import sys
from functools import partial
 
import psycopg2
from psycopg2 import OperationalError
from psycopg2.extensions import (POLL_OK, POLL_READ, POLL_WRITE, POLL_ERROR,
connection as base_connection, cursor as base_cursor)
 
from tornado.ioloop import IOLoop
 
 
io_poll = {
IOLoop.NONE: 'IOLoop.NONE',
IOLoop.READ: 'IOLoop.READ',
IOLoop.WRITE: 'IOLoop.WRITE',
IOLoop.ERROR: 'IOLoop.ERROR'
}
 
psy_poll = {
POLL_OK: 'POLL_OK',
POLL_READ: 'POLL_READ',
POLL_WRITE: 'POLL_WRITE',
POLL_ERROR: 'POLL_ERROR'
}
 
state_map = {
# POLL_OK: IOLoop.NONE,
POLL_READ: IOLoop.READ,
POLL_WRITE: IOLoop.WRITE,
POLL_ERROR: IOLoop.ERROR
}
 
 
def _dummy_callback(cursor):
pass
 
 
class Connection:
def __init__(self, dsn, connection_factory=base_connection, callback=None):
self.connection = psycopg2.connect(dsn, async=1,
connection_factory=connection_factory)
self.ioloop = IOLoop.instance()
self.callback = partial(callback, self)
 
self.counter = 0
 
state = self.connection.poll()
self.ioloop.add_handler(self.connection.fileno(), self.io_callback,
IOLoop.ERROR | state_map.get(state, IOLoop.ERROR))
 
def io_callback(self, fd, events):
self.counter += 1
print(self.counter)
 
state = self.connection.poll()
self.ioloop.update_handler(fd, state_map.get(state, 0))
 
if state == POLL_OK:
if self.callback is not None:
self.callback()
elif state not in (POLL_OK, POLL_READ, POLL_WRITE):
raise OperationalError('poll() returned {0}'.format(state))
 
def execute(self, operation, parameters=(), cursor_factory=base_cursor,
callback=_dummy_callback):
cursor = self.connection.cursor(cursor_factory=cursor_factory)
cursor.execute(operation, parameters)
 
# Set callback and connection state
self.callback = partial(callback, cursor)
 
state = self.connection.poll()
self.ioloop.update_handler(self.connection.fileno(),
state_map.get(state, IOLoop.ERROR))
 
 
def test_conn_callback(connection):
print('Test conn: ', connection)
connection.execute('SELECT 1;', callback=test_curs_callback)
 
def test_curs_callback(cursor):
print('Test curs: ', cursor)
print(cursor.fetchall())
 
 
if __name__ == '__main__':
dsn = 'dbname=momoko_db user=momoko_user password=password host=localhost port=5432'
Connection(dsn, callback=test_conn_callback)
 
IOLoop.instance().start()

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.