import sys | |
from functools import partial | |
import psycopg2 | |
from psycopg2 import OperationalError | |
from psycopg2.extensions import (POLL_OK, POLL_READ, POLL_WRITE, POLL_ERROR, | |
connection as base_connection, cursor as base_cursor) | |
from tornado.ioloop import IOLoop | |
io_poll = { | |
IOLoop.NONE: 'IOLoop.NONE', | |
IOLoop.READ: 'IOLoop.READ', | |
IOLoop.WRITE: 'IOLoop.WRITE', | |
IOLoop.ERROR: 'IOLoop.ERROR' | |
} | |
psy_poll = { | |
POLL_OK: 'POLL_OK', | |
POLL_READ: 'POLL_READ', | |
POLL_WRITE: 'POLL_WRITE', | |
POLL_ERROR: 'POLL_ERROR' | |
} | |
state_map = { | |
# POLL_OK: IOLoop.NONE, | |
POLL_READ: IOLoop.READ, | |
POLL_WRITE: IOLoop.WRITE, | |
POLL_ERROR: IOLoop.ERROR | |
} | |
def _dummy_callback(cursor): | |
pass | |
class Connection: | |
def __init__(self, dsn, connection_factory=base_connection, callback=None): | |
self.connection = psycopg2.connect(dsn, async=1, | |
connection_factory=connection_factory) | |
self.ioloop = IOLoop.instance() | |
self.callback = partial(callback, self) | |
self.counter = 0 | |
state = self.connection.poll() | |
self.ioloop.add_handler(self.connection.fileno(), self.io_callback, | |
IOLoop.ERROR | state_map.get(state, IOLoop.ERROR)) | |
def io_callback(self, fd, events): | |
self.counter += 1 | |
print(self.counter) | |
state = self.connection.poll() | |
self.ioloop.update_handler(fd, state_map.get(state, 0)) | |
if state == POLL_OK: | |
if self.callback is not None: | |
self.callback() | |
elif state not in (POLL_OK, POLL_READ, POLL_WRITE): | |
raise OperationalError('poll() returned {0}'.format(state)) | |
def execute(self, operation, parameters=(), cursor_factory=base_cursor, | |
callback=_dummy_callback): | |
cursor = self.connection.cursor(cursor_factory=cursor_factory) | |
cursor.execute(operation, parameters) | |
# Set callback and connection state | |
self.callback = partial(callback, cursor) | |
state = self.connection.poll() | |
self.ioloop.update_handler(self.connection.fileno(), | |
state_map.get(state, IOLoop.ERROR)) | |
def test_conn_callback(connection): | |
print('Test conn: ', connection) | |
connection.execute('SELECT 1;', callback=test_curs_callback) | |
def test_curs_callback(cursor): | |
print('Test curs: ', cursor) | |
print(cursor.fetchall()) | |
if __name__ == '__main__': | |
dsn = 'dbname=momoko_db user=momoko_user password=password host=localhost port=5432' | |
Connection(dsn, callback=test_conn_callback) | |
IOLoop.instance().start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment