Skip to content

Instantly share code, notes, and snippets.

@eungju
Last active January 1, 2016 22:19
Show Gist options
  • Save eungju/8209329 to your computer and use it in GitHub Desktop.
Save eungju/8209329 to your computer and use it in GitHub Desktop.
Thrift Client Pooling.
class ThriftClientPool(object):
def __init__(self, service, host, port, pool_conf=None):
self.service = service
self.host = host
self.port = port
self.pool_conf = {"max": 0, "min": 1, "idle_expire": 30, "expire": 3 * 60}
if pool_conf:
self.pool_conf.update(pool_conf)
self.pool = Queue(self.pool_conf["max"])
for i in range(self.pool_conf["min"]):
self.pool.put(None)
self.pid = os.getpid()
def dispose(self):
while True:
try:
conn = self.pool.get(block=False)
if conn:
self._abandon(conn)
except Empty:
break
def _acquire(self):
conn = None
try:
conn = self.pool.get(block=True, timeout=0.3)
except Empty:
log.warning("Reached the maximum, but create one more.")
if conn:
now = time.time()
if (now - conn.access_time > self.pool_conf["idle_expire"]) or (now - conn.open_time > self.pool_conf["expire"]):
log.info("Discard obsolete one.")
conn.close()
conn = None
return conn or ThriftConnection(self.service, self.host, self.port)
def _release(self, conn):
try:
conn.touch()
self.pool.put(conn, block=False)
except Full:
log.warning("The pool is full, discard it.")
conn.close()
def _abandon(self, conn):
conn.close()
@contextmanager
def client(self):
if os.getpid() != self.pid:
log.info("This pool is created by other process, reinitializing it.")
self.dispose()
self.__init__(self.service, self.host, self.port, pool_conf=self.pool_conf)
conn = self._acquire()
try:
yield conn.client
self._release(conn)
except Thrift.TException:
self._abandon(conn)
raise
except:
self._release(conn)
raise
@eungju
Copy link
Author

eungju commented Jan 3, 2014

This pool should not be shared between OS processes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment