Skip to content

Instantly share code, notes, and snippets.

@zeayes
Created February 20, 2017 09:20
Show Gist options
  • Save zeayes/14d26cb7513bf7adbf970a36f061ec17 to your computer and use it in GitHub Desktop.
Save zeayes/14d26cb7513bf7adbf970a36f061ec17 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import time
import logging
from tornado import gen
from collections import deque
class PoolExhaustedError(Exception):
pass
class PoolClientError(Exception):
pass
class AsyncPool(object):
def __init__(self, host, port, ClientClass, connect_timeout=3,
max_active_num=15, max_idle_num=10, idle_timeout=600):
self.host = host
self.port = port
self.active_num = 0
self.idle_conn_queue = deque()
self.max_idle_num = max_idle_num
self.max_active_num = max_active_num
self.connect_timeout = connect_timeout
if not hasattr(ClientClass, 'connect') or not hasattr(ClientClass, 'close')\
or not hasattr(ClientClass, 'closed'):
raise PoolClientError('pool doesnt support such client lacking some attributes')
self.ClientClass = ClientClass
@property
def server_address(self):
return '%s:%s' % (self.host, self.port)
def check_idle_queue(self):
while self.idle_conn_queue:
c = self.idle_conn_queue[0]
# idle_conn_queue is empty or first connection is not timeout.
if not c or c.idle_at + self.idle_timeout > time.time():
break
conn = self.idle_conn_queue.popleft()
if not conn.closed:
conn.closed()
logging.info('%s prune idle client, current active_num: %s idle_num: %s'
% (self.server_address, self.active_num, len(self.idle_conn_queue)))
def put(self, client):
self.active_num -= 1
if client.closed:
return
self.check_idle_queue()
current_idle_num = len(self.idle_conn_queue)
if current_idle_num > self.max_idle_num:
client.close()
logging.info('%s current idle_conn_queue exceed: %s' % (self.server_address, current_idle_num))
return
client.idle_at = time.time()
self.idle_conn_queue.append(client)
@gen.coroutine
def get(self):
if self.active_num > self.max_active_num:
raise PoolExhaustedError("connection pool exhausted. current active_num: %d" % self.active_num)
self.check_idle_queue()
client = None
if self.idle_conn_queue:
client = self.idle_conn_queue.pop()
if not client:
client = self.ClientClass(self.host, self.port, self.connect_timeout)
yield client.connect()
logging.info('%s create new client, current active_num: %s idle_num: %s'
% (self.server_address, self.active + 1, len(self.idle_queue)))
self.active_num += 1
raise gen.Return(client)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment