Created
December 2, 2014 19:22
-
-
Save iffy/c2d9965d7429f2e54760 to your computer and use it in GitHub Desktop.
An idea for pooling in Alchimia
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from twisted.internet import defer | |
from collections import deque | |
class TimeoutError(Exception): | |
pass | |
class Pooler(object): | |
""" | |
I pool db connections for alchimia. | |
Things I do: | |
- I limit the total number of connections you use. | |
- I timeout requests to acquire connections after a specified time. | |
Things I don't do: | |
- I don't detect closed connections and remove them from the pool. | |
- I don't restrict connections to a particular thread (at least, | |
not intentionally) | |
- I don't timeout execution of SQL. | |
""" | |
def __init__(self, engine, max_connections=5, timeout=60, _clock=None): | |
""" | |
@param engine: A TwistedEngine | |
@param max_connections: Maximum number of simultaneous connections. | |
@param timeout: Seconds after which an acquire will fail. | |
@param _clock: For testing. This is the clock used to schedule timeout. | |
""" | |
from twisted.internet import reactor | |
self._engine = engine | |
self._clock = _clock or reactor | |
self.max_connections = max_connections | |
self.timeout = timeout | |
self.number_of_connections = 0 | |
self.connections_being_used = deque() | |
self.idle_connections = deque() | |
self._pending_acquires = deque() | |
def execute(self, *args, **kwargs): | |
""" | |
Execute a single statement within a transaction. | |
""" | |
return self.runInTransaction(self._transactionalExecute, *args, **kwargs) | |
def _transactionalExecute(self, conn, *args, **kwargs): | |
return conn.execute(*args, **kwargs) | |
@defer.inlineCallbacks | |
def runInTransaction(self, func, *args, **kwargs): | |
""" | |
Run a function within a transaction. The function will be called with | |
a C{Connection} as the first argument and with whatever C{*args} and | |
C{**kwargs} you pass in. | |
""" | |
conn = yield self.acquireConnection() | |
transaction = yield conn.begin() | |
try: | |
ret = yield func(conn, *args, **kwargs) | |
yield transaction.commit() | |
except Exception as e: | |
yield transaction.rollback() | |
raise e | |
finally: | |
yield self.releaseConnection(conn) | |
defer.returnValue(ret) | |
def acquireConnection(self): | |
""" | |
Get a connection (when it's available). When you're done, | |
call L{releaseConnection}. If you don't want to have to worry about | |
that, then use L{runInTransaction} instead. | |
""" | |
d = defer.Deferred() | |
later = self._clock.callLater(self.timeout, self._timeoutAcquisition, d) | |
d.addCallback(self._cancelTimeoutCheck, later) | |
self._pending_acquires.append(d) | |
self._tick() | |
return d | |
def _tick(self): | |
if self._pending_acquires: | |
if self.idle_connections: | |
next = self._pending_acquires.popleft() | |
conn = self.idle_connections.popleft() | |
self._assignConnection(conn, next) | |
elif self.number_of_connections < self.max_connections: | |
self.number_of_connections += 1 | |
next = self._pending_acquires.popleft() | |
conn = self._engine.connect() | |
conn.addCallback(self._assignConnection, next) | |
def _assignConnection(self, conn, waiting): | |
self.connections_being_used.append(conn) | |
waiting.callback(conn) | |
def _cancelTimeoutCheck(self, conn, call): | |
call.cancel() | |
return conn | |
def _timeoutAcquisition(self, d): | |
self._pending_acquires.remove(d) | |
d.errback(TimeoutError('Waited %ds for connection' % (self.timeout,))) | |
def releaseConnection(self, conn): | |
""" | |
Mark a connection as "done" and ready for someone else to acquire. | |
""" | |
self.connections_being_used.remove(conn) | |
self.idle_connections.append(conn) | |
self._tick() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment