Skip to content

Instantly share code, notes, and snippets.

@anatoly-scherbakov
Forked from luqmansen/conn_pool.py
Last active November 25, 2021 00:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anatoly-scherbakov/cee83fa0fc4b7d74af34ee11ce7e6358 to your computer and use it in GitHub Desktop.
Save anatoly-scherbakov/cee83fa0fc4b7d74af34ee11ce7e6358 to your computer and use it in GitHub Desktop.
Simple Object Pool pattern to Create Connection Pool
from datetime import datetime
from multiprocessing import Lock
from dataclasses import dataclass
from datetime import datetime
import mysql.connector
USER = "root"
PASSWD = "admin"
HOST = 'localhost'
DB = "db_test"
class PoolExhausted(Exception):
"""Pool is exhausted."""
@dataclass
class Connection:
"""MySQL connection.""" # FIXME classes should have docstrings
user: str
password: str
host: str
db: str
created_time: datetime = field(default_factory=datetime.now)
# def __init__(self, user: str, pw: str, host: str, db: str): # FIXME use mypy to verify annotations
# self.created = datetime.now()
# # TODO: this connection creation should be lazy. We should not create a 100 connections to MySQL until we really need them.
# self.conn = mysql.connector.connect(user=user, password=pw, host=host, database=db)
def get_conn(self): # Avoid contractions: replace get_conn → get_connection
# This might be the more appropriate place to open a real connection to the DB.
return self.conn
def close_conn(self):
return self.conn.close()
# FIXME this can be made a dataclass either
class ConnectionPool:
def __init__(self, conn_number):
self.conn_number = conn_number
self.pool = [] # Replace with a Queue or with a deque - this will allow to remove locks
self.init_pool()
self.lock = Lock()
@staticmethod
def create_new_conn():
# FIXME should not be a staticmethod. Anyway, this should be encapsulated into the context manager.
return Connection(USER, PASSWD, HOST, DB)
def init_pool(self):
for i in range(self.conn_number):
conn = self.create_new_conn()
self.pool.append(conn.get_conn())
def get_conn(self):
# FIXME the `get_conn` function is removing connection from the pool and thus the pool is not capable
# of tracking if connection is lost.
# Also, if a connection is not available, we do not wait for it to become available - we just fail.
try:
with self.lock:
return self.pool.pop()
except IndexError as err:
raise PoolExhausted() from err # FIXME use a meaningful exception. But still better, wait till a connection becomes avaialble.
# perhaps it is a good idea to provide a timeout value as one of the pool parameters: if the connection does not
# become available when the timeout expires, only then we raise an exception.
def return_conn(self, conn: Connection):
# We impose a responsibility upon client code to return the connection back to us. If the client code fails to
# do so, we will end up with a lot of hanging connections.
with self.lock:
self.pool.append(conn)
# TODO write a context manager here and remove `return_conn()`
def main()
from threading import Thread # imports should be global, use flake8 to look for such inconsistencies
# I would recommend to pip install wemake-python-styleguide
pool = ConnectionPool(1)
def try_connection(pid):
try:
self.do_all_the_work():
except Exception as err:
logger.exception()
print("Conn from pid ", pid) # Use `logging` instead of `print`
try:
conn = pool.get_conn()
print(conn.get_server_info())
# try pulling connection without returning it
# pool.return_conn(conn)
except Exception as e: # This exception will be lost unseen if we do not look into the program stdout.
print(str(e))
# TODO this thread does not return the connection to the pool because the return_conn() call is commented.
# proc = []
# for i in range(200):
# proc.append(Thread(target=try_connection, args=(i,)))
# # TODO this will cause the code to fail because only one thread will consume the single available connection
# # and not return it.
# [p.start() for p in proc]
# [p.join() for p in proc]
# Use concurrent.futures instead of explicit thread operations like this
executor = concurrent.futures.ThreadPoolExucutor(max_workers=200)
executor.map(
try_connection,
range(200),
)
if __name__ == 'main':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment