Last active
September 16, 2015 10:25
-
-
Save ls0f/f38f277f7aedd30f678f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#coding:utf-8 | |
# https://gist.github.com/opensourcegeek/9822127 | |
from gevent import monkey | |
monkey.patch_all() | |
import logging | |
import gevent | |
from gevent.queue import Queue, Empty | |
import pymysql as db | |
db.install_as_MySQLdb() | |
logging.basicConfig(level=logging.DEBUG) | |
LOGGER = logging.getLogger("pool") | |
class ConnectionPool: | |
def __init__(self, db_config, pool_size=10, time_to_sleep=30): | |
self.username = db_config.get('user') | |
self.password = db_config.get('password') | |
self.host = db_config.get('host') | |
self.port = int(db_config.get('port')) | |
self.max_pool_size = pool_size | |
self.pool = None | |
self.time_to_sleep = time_to_sleep | |
self._initialize_pool() | |
def get_conn(self): | |
conn = db.connect(host=self.host, user=self.username, | |
passwd=self.password, port=self.port, charset="utf8") | |
return conn | |
def get(self): | |
try: | |
return self.pool.get(timeout=0.2) | |
except Empty: | |
return self.get_conn() | |
def put(self, conn): | |
if self.pool.qsize() > self.max_pool_size: | |
LOGGER.debug("pool size is full, will not push") | |
self._close_conn(conn) | |
else: | |
self.pool.put_nowait(conn) | |
@staticmethod | |
def _close_conn(conn): | |
try: | |
conn.close() | |
except db.OperationalError, e: | |
LOGGER.error("conn close fail") | |
def get_pool_size(self): | |
return self.pool.qsize() | |
def supply_conn_for_pool(self): | |
current_pool_size = self.pool.qsize() | |
if current_pool_size < self.max_pool_size: | |
for _ in xrange(0, self.max_pool_size - current_pool_size): | |
try: | |
conn = self.get_conn() | |
self.pool.put_nowait(conn) | |
except db.OperationalError, e: | |
LOGGER.error("Cannot initialize connection pool - retrying in {} seconds".format(self.time_to_sleep)) | |
LOGGER.exception(e) | |
break | |
def _initialize_pool(self): | |
self.pool = Queue() | |
self.supply_conn_for_pool() | |
gevent.spawn(self._check_for_connection_loss) | |
def _check_for_connection_loss(self): | |
while True: | |
try: | |
conn = self.pool.get(timeout=0.2) | |
except Empty: | |
conn = None | |
if not self._ping(conn): | |
LOGGER.debug("conn is loss".format(str(conn))) | |
else: | |
LOGGER.debug("conn is alive".format(str(conn))) | |
self.pool.put_nowait(conn) | |
if self.pool.qsize() < self.max_pool_size / 2: | |
self.supply_conn_for_pool() | |
LOGGER.debug("pool size is {}".format(self.pool.qsize())) | |
gevent.sleep(self.time_to_sleep) | |
def _ping(self, conn): | |
if conn is None: | |
return False | |
try: | |
cursor = conn.cursor() | |
cursor.execute('select 1;') | |
LOGGER.debug(cursor.fetchall()) | |
return True | |
except db.OperationalError, e: | |
LOGGER.warn('Cannot connect to mysql - retrying in {} seconds'.format(self.time_to_sleep)) | |
LOGGER.exception(e) | |
return False | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment