Created
October 5, 2019 17:46
-
-
Save simonw/7111263f93595693b13d0776f92066c7 to your computer and use it in GitHub Desktop.
Earlier attempt at Datasette connection pooling, Version from 8am 2nd October 2019
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import contextlib | |
import threading | |
import sqlite3 | |
class Connection: | |
def __init__(self, name, db_conn): | |
self.name = name | |
self.db_conn = db_conn | |
self.available = True | |
self._lock = threading.Lock() | |
def __repr__(self): | |
return "{} {} ({})".format( | |
self.name, self.db_conn, 'available' if self.available else 'loc' | |
) | |
class ConnectionGroup: | |
def __init__(self, name, db_conn, limit=3): | |
self.name = name | |
self.db_conn = db_conn | |
self.connections = [] | |
self._lock = threading.Lock() | |
self.limit = limit | |
self._semaphore = threading.Semaphore(value=limit) | |
@contextlib.contextmanager | |
def connection(self): | |
def _get_connection_or_semaphore(): | |
with self._lock: | |
# Return next available connection OR create a new one | |
for conn in self.connections: | |
if conn._lock.aquire(False): | |
return conn, None, False | |
# If we get here, either no connections OR all locked | |
if len(self.connections) <= self.limit: | |
# Create a new connection | |
conn = Connection(name, sqlite3.connect(self.databases[name])) | |
self.connections.append(conn) | |
conn._lock.aquire() | |
return conn, None, True | |
else: | |
# Block on semaphore awaiting free connection | |
return None, self._semaphore, False | |
conn, semaphore, needs_aquiring = _get_connection_or_semaphore() | |
if conn: | |
try: | |
if needs_aquiring: | |
conn.aquire() | |
self._semaphore.acquire() | |
yield conn | |
finally: | |
conn.release() | |
self._semaphore.release() | |
else: | |
# Block on semaphore and try again | |
self._semaphore.acquire() | |
class Pool: | |
max_connections_per_database = 3 | |
lock = threading.RLock() | |
def __init__(self, databases=None): | |
self.databases = {} | |
self.conditions = {} | |
self.connections = {} | |
for key, value in (databases or {}).items(): | |
self.add_database(key, value) | |
def add_database(self, name, filepath): | |
self.databases[name] = filepath | |
self.conditions[name] = threading.Condition() | |
@contextlib.contextmanager | |
def connection(self, name): | |
assert name in self.databases | |
connection = None | |
block_on_condition = None | |
with self.lock: | |
# Inside this global lock we decide what our connection strategy | |
# is going to be - we may opt to create a new connection, reuse | |
# an existing one or block waiting for a connection to become free | |
connections = self.connections.get(name) or [] | |
available = [c for c in connections if c.available] | |
if available: | |
# Use first available - though we could randomize or | |
# round-robin connection usage here in the future | |
connection = available[0] | |
else: | |
# None available - open a new one or block | |
if len(connections) < self.max_connections_per_database: | |
# Open a new connection and add it to the list | |
connection = Connection(name, sqlite3.connect(self.databases[name])) | |
connections.append(connection) | |
self.connections[name] = connections | |
else: | |
# All connections are in use! Block until one is available | |
block_on_condition = self.conditions[name] | |
if block_on_condition is not None: | |
print("block on condition", block_on_condition, self.connections) | |
if block_on_condition._is_owned(): | |
block_on_condition.wait() | |
connections = self.connections[name] | |
connection = [c for c in connections if c.available][0] | |
with self.conditions[name]: | |
connection.available = False | |
yield connection | |
connection.available = True | |
self.conditions[name].notify() | |
# pool = Pool( | |
# { | |
# "trees": "/Users/simonw/Dropbox/Development/sf-trees.db", | |
# "healthkit": "/Users/simonw/Dropbox/dogsheep/healthkit.db", | |
# } | |
# ) | |
# with pool.connection("fixtures") as conn: | |
# conn.set_time_limit(1000) | |
# conn.allow_all() | |
# conn.execute(...) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datasette.pool import Pool | |
import threading | |
import pytest | |
import time | |
@pytest.fixture | |
def pool(): | |
return Pool({"one": ":memory:"}) | |
def test_create_connection(pool): | |
assert {} == pool.connections | |
with pool.connection("one") as conn: | |
assert not conn.available | |
assert conn.available | |
assert {"one": [conn]} == pool.connections | |
def test_reuse_connection(pool): | |
with pool.connection("one"): | |
pass | |
assert pool.connections["one"][0].available | |
with pool.connection("one") as conn: | |
assert conn is pool.connections["one"][0] | |
assert not conn.available | |
assert pool.connections["one"][0].available | |
def test_create_second_connection_if_first_is_not_available(pool): | |
with pool.connection("one"): | |
pass | |
connections = pool.connections["one"] | |
connections[0].available = False | |
assert 1 == len(connections) | |
with pool.connection("one") as conn: | |
# Should be two connections now, both unavailable | |
assert 2 == len(connections) | |
assert all([not c.available] for c in connections) | |
assert conn in connections | |
# Should still be two connections, both available | |
assert 2 == len(connections) | |
assert all([c.available] for c in connections) | |
def test_block_until_connection_is_released(pool): | |
# If we have hit max connections already, block until | |
# a collection is released by another thread | |
pool.max_connections_per_database = 1 | |
def block_connection(pool): | |
with pool.connection("one"): | |
time.sleep(0.2) | |
t = threading.Thread(target=block_connection, args=[pool]) | |
assert {} == pool.connections | |
t.start() | |
# Give thread time to grab the connection: | |
time.sleep(0.05) | |
# Thread should now have grabbed and reserved connection: | |
assert 1 == len(pool.connections["one"]) | |
assert not pool.connections["one"][0].available | |
start = time.time() | |
# Now we attempt to use the connection. This should block. | |
with pool.connection("one"): | |
assert 1 == len(pool.connections["one"]) | |
assert not pool.connections["one"][0].available | |
# This should have taken more than 0.1 seconds | |
end = time.time() | |
assert (end - start) > 0.1 | |
# Ensure thread has run to completion before ending test: | |
t.join() | |
# Connection should be available at the end | |
assert pool.connections["one"][0].available | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment