Created
February 20, 2017 09:20
-
-
Save zeayes/14d26cb7513bf7adbf970a36f061ec17 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import time | |
import logging | |
from tornado import gen | |
from collections import deque | |
class PoolExhaustedError(Exception): | |
pass | |
class PoolClientError(Exception): | |
pass | |
class AsyncPool(object): | |
def __init__(self, host, port, ClientClass, connect_timeout=3, | |
max_active_num=15, max_idle_num=10, idle_timeout=600): | |
self.host = host | |
self.port = port | |
self.active_num = 0 | |
self.idle_conn_queue = deque() | |
self.max_idle_num = max_idle_num | |
self.max_active_num = max_active_num | |
self.connect_timeout = connect_timeout | |
if not hasattr(ClientClass, 'connect') or not hasattr(ClientClass, 'close')\ | |
or not hasattr(ClientClass, 'closed'): | |
raise PoolClientError('pool doesnt support such client lacking some attributes') | |
self.ClientClass = ClientClass | |
@property | |
def server_address(self): | |
return '%s:%s' % (self.host, self.port) | |
def check_idle_queue(self): | |
while self.idle_conn_queue: | |
c = self.idle_conn_queue[0] | |
# idle_conn_queue is empty or first connection is not timeout. | |
if not c or c.idle_at + self.idle_timeout > time.time(): | |
break | |
conn = self.idle_conn_queue.popleft() | |
if not conn.closed: | |
conn.closed() | |
logging.info('%s prune idle client, current active_num: %s idle_num: %s' | |
% (self.server_address, self.active_num, len(self.idle_conn_queue))) | |
def put(self, client): | |
self.active_num -= 1 | |
if client.closed: | |
return | |
self.check_idle_queue() | |
current_idle_num = len(self.idle_conn_queue) | |
if current_idle_num > self.max_idle_num: | |
client.close() | |
logging.info('%s current idle_conn_queue exceed: %s' % (self.server_address, current_idle_num)) | |
return | |
client.idle_at = time.time() | |
self.idle_conn_queue.append(client) | |
@gen.coroutine | |
def get(self): | |
if self.active_num > self.max_active_num: | |
raise PoolExhaustedError("connection pool exhausted. current active_num: %d" % self.active_num) | |
self.check_idle_queue() | |
client = None | |
if self.idle_conn_queue: | |
client = self.idle_conn_queue.pop() | |
if not client: | |
client = self.ClientClass(self.host, self.port, self.connect_timeout) | |
yield client.connect() | |
logging.info('%s create new client, current active_num: %s idle_num: %s' | |
% (self.server_address, self.active + 1, len(self.idle_queue))) | |
self.active_num += 1 | |
raise gen.Return(client) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment