Skip to content

Instantly share code, notes, and snippets.

@iffy
Created December 2, 2014 19:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save iffy/c2d9965d7429f2e54760 to your computer and use it in GitHub Desktop.
Save iffy/c2d9965d7429f2e54760 to your computer and use it in GitHub Desktop.
An idea for pooling in Alchimia
from twisted.internet import defer
from collections import deque
class TimeoutError(Exception):
pass
class Pooler(object):
"""
I pool db connections for alchimia.
Things I do:
- I limit the total number of connections you use.
- I timeout requests to acquire connections after a specified time.
Things I don't do:
- I don't detect closed connections and remove them from the pool.
- I don't restrict connections to a particular thread (at least,
not intentionally)
- I don't timeout execution of SQL.
"""
def __init__(self, engine, max_connections=5, timeout=60, _clock=None):
"""
@param engine: A TwistedEngine
@param max_connections: Maximum number of simultaneous connections.
@param timeout: Seconds after which an acquire will fail.
@param _clock: For testing. This is the clock used to schedule timeout.
"""
from twisted.internet import reactor
self._engine = engine
self._clock = _clock or reactor
self.max_connections = max_connections
self.timeout = timeout
self.number_of_connections = 0
self.connections_being_used = deque()
self.idle_connections = deque()
self._pending_acquires = deque()
def execute(self, *args, **kwargs):
"""
Execute a single statement within a transaction.
"""
return self.runInTransaction(self._transactionalExecute, *args, **kwargs)
def _transactionalExecute(self, conn, *args, **kwargs):
return conn.execute(*args, **kwargs)
@defer.inlineCallbacks
def runInTransaction(self, func, *args, **kwargs):
"""
Run a function within a transaction. The function will be called with
a C{Connection} as the first argument and with whatever C{*args} and
C{**kwargs} you pass in.
"""
conn = yield self.acquireConnection()
transaction = yield conn.begin()
try:
ret = yield func(conn, *args, **kwargs)
yield transaction.commit()
except Exception as e:
yield transaction.rollback()
raise e
finally:
yield self.releaseConnection(conn)
defer.returnValue(ret)
def acquireConnection(self):
"""
Get a connection (when it's available). When you're done,
call L{releaseConnection}. If you don't want to have to worry about
that, then use L{runInTransaction} instead.
"""
d = defer.Deferred()
later = self._clock.callLater(self.timeout, self._timeoutAcquisition, d)
d.addCallback(self._cancelTimeoutCheck, later)
self._pending_acquires.append(d)
self._tick()
return d
def _tick(self):
if self._pending_acquires:
if self.idle_connections:
next = self._pending_acquires.popleft()
conn = self.idle_connections.popleft()
self._assignConnection(conn, next)
elif self.number_of_connections < self.max_connections:
self.number_of_connections += 1
next = self._pending_acquires.popleft()
conn = self._engine.connect()
conn.addCallback(self._assignConnection, next)
def _assignConnection(self, conn, waiting):
self.connections_being_used.append(conn)
waiting.callback(conn)
def _cancelTimeoutCheck(self, conn, call):
call.cancel()
return conn
def _timeoutAcquisition(self, d):
self._pending_acquires.remove(d)
d.errback(TimeoutError('Waited %ds for connection' % (self.timeout,)))
def releaseConnection(self, conn):
"""
Mark a connection as "done" and ready for someone else to acquire.
"""
self.connections_being_used.remove(conn)
self.idle_connections.append(conn)
self._tick()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment