Skip to content

Instantly share code, notes, and snippets.

@andymccurdy
Created January 22, 2020 20:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save andymccurdy/c49167e00691be6e636d001c3e68a12d to your computer and use it in GitHub Desktop.
Save andymccurdy/c49167e00691be6e636d001c3e68a12d to your computer and use it in GitHub Desktop.
"""
This is a simple benchmark that tests the performance of several
locking implementations. Each example implements a connection
pool that provides `get_connection()` to retrieve a connection
from the pool and `release()` to return a connection back to the
pool.
The `test()` function creates an instance of `pool_class` and
creates `num_threads` `threading.Thread` instances that simply call
`pool.get_connection()` and `pool.release()` repeatedly until
`num_seconds` has elapsed.
ListLockPool uses a `threading.Lock` to protect the critical
sections of `get_connection()` and `release()`.
ListConditionPool uses two `threading.Condition` objects, each using
the same `threading.Lock` to protect the critical sections of
`get_connections()` and `release()`.
ListConditionNotifyPool is identical to ListConditionPool except that
it also calls the relevant conditions `notify()` at the end of the
each critical section. This mimics `queue.Queue`'s implementation.
QueuePool uses the standard library's `queue.LifoQueue`.
Benchmarks:
All timings were performed on CPython 3.8.0 on OSX.
Using 10 threads for 10 seconds, each implementation roughly
performs at:
ListLockPool: 1x
ListConditionPool: 1.1x (always slightly faster than ListLockPool)
ListConditionNotifyPool: 4x
QueuePool: 3x
The ListConditionNotifyPool and QueuePool share a very similar
implementation. Python's LifoQueue class has more features
than what ListConditionNotifyPool needs and likely those extra bits
are the reason why QueuePool is slower than the stripped down
ListConditionNotifyPool.
However I do not understand why there is any difference between
the three List*Pool implementations. From looking at the
`threading.Condition` source code[0], the `threading.Condition.__enter__`
method simply calls its lock's `__enter__`. Therefore it seems to
me that ListLockPool and ListConditionPool should be more or less
identical. But from my observation ListConditionPool is always slightly
faster.
ListConditionNotifyPool is even more peculiar. The only difference
between ListCondtionPool and ListConditionNotifyPool is that the latter
calls `condition_object.notify()` at the end of each critiical section.
However there are no calls to `condition_object.wait()` and therefore
no "waiters" to wake up when calling `.notify()`. I don't understand
why simply calling `condition_object.notify()` provides a ~4x performance
improvement over ListConditionPool.
[0] https://github.com/python/cpython/blob/master/Lib/threading.py#L247
"""
import threading
import time
from queue import LifoQueue, Empty, Full
class ListLockPool:
def __init__(self, max_connections):
self.max_connections = max_connections
self.created_connections = 0
self.connections = []
self.lock = threading.Lock()
def get_connection(self):
with self.lock:
try:
connection = self.connections.pop()
except IndexError:
connection = self.make_connection()
return connection
def make_connection(self):
if self.created_connections >= self.max_connections:
raise Exception('Too Many Connections')
self.created_connections += 1
return object()
def release(self, connection):
with self.lock:
self.connections.append(connection)
def count(self):
return len(self.connections)
class ListConditionPool:
def __init__(self, max_connections):
self.max_connections = max_connections
self.created_connections = 0
self.connections = []
self.lock = threading.Lock()
self.not_empty = threading.Condition(self.lock)
self.not_full = threading.Condition(self.lock)
def get_connection(self):
with self.not_empty:
try:
connection = self.connections.pop()
except IndexError:
connection = self.make_connection()
return connection
def make_connection(self):
if self.created_connections >= self.max_connections:
raise Exception('Too Many Connections')
self.created_connections += 1
return object()
def release(self, connection):
with self.not_full:
self.connections.append(connection)
def count(self):
return len(self.connections)
class ListConditionNotifyPool:
def __init__(self, max_connections):
self.max_connections = max_connections
self.created_connections = 0
self.connections = []
self.lock = threading.Lock()
self.not_empty = threading.Condition(self.lock)
self.not_full = threading.Condition(self.lock)
def get_connection(self):
with self.not_empty:
try:
connection = self.connections.pop()
except IndexError:
connection = self.make_connection()
self.not_full.notify()
return connection
def make_connection(self):
if self.created_connections >= self.max_connections:
raise Exception('Too Many Connections')
self.created_connections += 1
return object()
def release(self, connection):
with self.not_full:
self.connections.append(connection)
self.not_empty.notify()
def count(self):
return len(self.connections)
class QueuePool:
def __init__(self, max_connections):
self.connections = LifoQueue(max_connections)
self.created_connections = 0
while True:
try:
self.connections.put(None, block=False)
except Full:
break
def get_connection(self):
connection = None
try:
connection = self.connections.get(block=False)
except Empty:
raise Exception('No Connection Available')
if connection is None:
connection = self.make_connection()
return connection
def make_connection(self):
self.created_connections += 1
return object()
def release(self, connection):
self.connections.put(connection, block=False)
def count(self):
return self.connections.qsize()
def worker(pool, stop_at, results):
x = 0
while time.time() < stop_at:
connection = pool.get_connection()
pool.release(connection)
x += 1
results.append(x)
def test(pool_class, num_threads, num_seconds):
threads = []
pool = pool_class(num_threads)
stop_at = time.time() + num_seconds
results = []
for i in range(num_threads):
threads.append(
threading.Thread(target=worker, args=(pool, stop_at, results))
)
for t in threads:
t.start()
for t in threads:
t.join()
print(f'{pool_class.__name__}: '
f'{sum(results):,} pool requests, '
f'{pool.created_connections} connections created, '
f'{pool.count()} connections in pool')
test(ListLockPool, 10, 10)
test(ListConditionPool, 10, 10)
test(ListConditionNotifyPool, 10, 10)
test(QueuePool, 10, 10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment