Skip to content

Instantly share code, notes, and snippets.

@ls0f
Last active September 16, 2015 10:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ls0f/f38f277f7aedd30f678f to your computer and use it in GitHub Desktop.
Save ls0f/f38f277f7aedd30f678f to your computer and use it in GitHub Desktop.
#coding:utf-8
# https://gist.github.com/opensourcegeek/9822127
from gevent import monkey
monkey.patch_all()
import logging
import gevent
from gevent.queue import Queue, Empty
import pymysql as db
db.install_as_MySQLdb()
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger("pool")
class ConnectionPool:
def __init__(self, db_config, pool_size=10, time_to_sleep=30):
self.username = db_config.get('user')
self.password = db_config.get('password')
self.host = db_config.get('host')
self.port = int(db_config.get('port'))
self.max_pool_size = pool_size
self.pool = None
self.time_to_sleep = time_to_sleep
self._initialize_pool()
def get_conn(self):
conn = db.connect(host=self.host, user=self.username,
passwd=self.password, port=self.port, charset="utf8")
return conn
def get(self):
try:
return self.pool.get(timeout=0.2)
except Empty:
return self.get_conn()
def put(self, conn):
if self.pool.qsize() > self.max_pool_size:
LOGGER.debug("pool size is full, will not push")
self._close_conn(conn)
else:
self.pool.put_nowait(conn)
@staticmethod
def _close_conn(conn):
try:
conn.close()
except db.OperationalError, e:
LOGGER.error("conn close fail")
def get_pool_size(self):
return self.pool.qsize()
def supply_conn_for_pool(self):
current_pool_size = self.pool.qsize()
if current_pool_size < self.max_pool_size:
for _ in xrange(0, self.max_pool_size - current_pool_size):
try:
conn = self.get_conn()
self.pool.put_nowait(conn)
except db.OperationalError, e:
LOGGER.error("Cannot initialize connection pool - retrying in {} seconds".format(self.time_to_sleep))
LOGGER.exception(e)
break
def _initialize_pool(self):
self.pool = Queue()
self.supply_conn_for_pool()
gevent.spawn(self._check_for_connection_loss)
def _check_for_connection_loss(self):
while True:
try:
conn = self.pool.get(timeout=0.2)
except Empty:
conn = None
if not self._ping(conn):
LOGGER.debug("conn is loss".format(str(conn)))
else:
LOGGER.debug("conn is alive".format(str(conn)))
self.pool.put_nowait(conn)
if self.pool.qsize() < self.max_pool_size / 2:
self.supply_conn_for_pool()
LOGGER.debug("pool size is {}".format(self.pool.qsize()))
gevent.sleep(self.time_to_sleep)
def _ping(self, conn):
if conn is None:
return False
try:
cursor = conn.cursor()
cursor.execute('select 1;')
LOGGER.debug(cursor.fetchall())
return True
except db.OperationalError, e:
LOGGER.warn('Cannot connect to mysql - retrying in {} seconds'.format(self.time_to_sleep))
LOGGER.exception(e)
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment