Created
May 24, 2016 02:46
-
-
Save skolsuper/a48e5077b28a858f90806b736032abcf to your computer and use it in GitHub Desktop.
Rudimentary connection pool for rethinkdb asyncio connections
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from weakref import WeakSet | |
from asyncio import Queue, QueueEmpty | |
from logging import getLogger | |
from typing import Tuple | |
import rethinkdb as r | |
from rethinkdb.net import DefaultConnection | |
l = getLogger('rethinkdb.connection') | |
r.set_loop_type('asyncio') | |
DatabaseQuery = Tuple[str, tuple, dict] | |
class ConnectionPool: | |
def __init__(self): | |
self._config_dict = None | |
self._queue = Queue() | |
self._outstanding_connections = WeakSet() | |
async def get_conn(self): | |
self._check_config() | |
try: | |
while True: | |
conn = self._queue.get_nowait() | |
if conn.is_open(): | |
break | |
except QueueEmpty: | |
conn = await r.connect(**self._config_dict) | |
self._outstanding_connections.add(conn) | |
return conn | |
async def put_conn(self, conn): | |
self._queue.put_nowait(conn) | |
self._outstanding_connections.remove(conn) | |
def set_config(self, config): | |
self._config_dict = config | |
def get_config(self): | |
self._check_config() | |
return self._config_dict | |
async def teardown(self): | |
while True: | |
try: | |
conn = self._queue.get_nowait() | |
except QueueEmpty: | |
break | |
self._outstanding_connections.add(conn) | |
for conn in self._outstanding_connections: | |
try: | |
await conn.close() | |
except Exception: | |
pass | |
def _check_config(self): | |
assert self._config_dict is not None, "Did you remember to run resync.setup()?" | |
connection_pool = ConnectionPool() | |
class RethinkConnection: | |
""" | |
A context manager helper to get a connection from the pool and return it to the pool when | |
it is finished. | |
""" | |
async def __aenter__(self): | |
self._conn = await connection_pool.get_conn() | |
return self._conn | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
if exc_type is not None: | |
l.debug('%s, %s\n%s', exc_type, exc_val, exc_tb) | |
await connection_pool.put_conn(self._conn) | |
def get_sync_connection(timeout=20): | |
""" | |
Convenience method for testing. | |
:return: Synchronous (blocking) connection to rethinkdb | |
""" | |
conn = DefaultConnection( | |
**connection_pool.get_config(), | |
auth_key='', | |
timeout=timeout, | |
ssl=dict(), | |
) | |
return conn.reconnect(timeout=timeout) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment