Skip to content

Instantly share code, notes, and snippets.

@simonw
Created October 5, 2019 17:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save simonw/7111263f93595693b13d0776f92066c7 to your computer and use it in GitHub Desktop.
Save simonw/7111263f93595693b13d0776f92066c7 to your computer and use it in GitHub Desktop.
Earlier attempt at Datasette connection pooling, Version from 8am 2nd October 2019
import contextlib
import threading
import sqlite3
class Connection:
def __init__(self, name, db_conn):
self.name = name
self.db_conn = db_conn
self.available = True
self._lock = threading.Lock()
def __repr__(self):
return "{} {} ({})".format(
self.name, self.db_conn, 'available' if self.available else 'loc'
)
class ConnectionGroup:
def __init__(self, name, db_conn, limit=3):
self.name = name
self.db_conn = db_conn
self.connections = []
self._lock = threading.Lock()
self.limit = limit
self._semaphore = threading.Semaphore(value=limit)
@contextlib.contextmanager
def connection(self):
def _get_connection_or_semaphore():
with self._lock:
# Return next available connection OR create a new one
for conn in self.connections:
if conn._lock.aquire(False):
return conn, None, False
# If we get here, either no connections OR all locked
if len(self.connections) <= self.limit:
# Create a new connection
conn = Connection(name, sqlite3.connect(self.databases[name]))
self.connections.append(conn)
conn._lock.aquire()
return conn, None, True
else:
# Block on semaphore awaiting free connection
return None, self._semaphore, False
conn, semaphore, needs_aquiring = _get_connection_or_semaphore()
if conn:
try:
if needs_aquiring:
conn.aquire()
self._semaphore.acquire()
yield conn
finally:
conn.release()
self._semaphore.release()
else:
# Block on semaphore and try again
self._semaphore.acquire()
class Pool:
max_connections_per_database = 3
lock = threading.RLock()
def __init__(self, databases=None):
self.databases = {}
self.conditions = {}
self.connections = {}
for key, value in (databases or {}).items():
self.add_database(key, value)
def add_database(self, name, filepath):
self.databases[name] = filepath
self.conditions[name] = threading.Condition()
@contextlib.contextmanager
def connection(self, name):
assert name in self.databases
connection = None
block_on_condition = None
with self.lock:
# Inside this global lock we decide what our connection strategy
# is going to be - we may opt to create a new connection, reuse
# an existing one or block waiting for a connection to become free
connections = self.connections.get(name) or []
available = [c for c in connections if c.available]
if available:
# Use first available - though we could randomize or
# round-robin connection usage here in the future
connection = available[0]
else:
# None available - open a new one or block
if len(connections) < self.max_connections_per_database:
# Open a new connection and add it to the list
connection = Connection(name, sqlite3.connect(self.databases[name]))
connections.append(connection)
self.connections[name] = connections
else:
# All connections are in use! Block until one is available
block_on_condition = self.conditions[name]
if block_on_condition is not None:
print("block on condition", block_on_condition, self.connections)
if block_on_condition._is_owned():
block_on_condition.wait()
connections = self.connections[name]
connection = [c for c in connections if c.available][0]
with self.conditions[name]:
connection.available = False
yield connection
connection.available = True
self.conditions[name].notify()
# pool = Pool(
# {
# "trees": "/Users/simonw/Dropbox/Development/sf-trees.db",
# "healthkit": "/Users/simonw/Dropbox/dogsheep/healthkit.db",
# }
# )
# with pool.connection("fixtures") as conn:
# conn.set_time_limit(1000)
# conn.allow_all()
# conn.execute(...)
from datasette.pool import Pool
import threading
import pytest
import time
@pytest.fixture
def pool():
return Pool({"one": ":memory:"})
def test_create_connection(pool):
assert {} == pool.connections
with pool.connection("one") as conn:
assert not conn.available
assert conn.available
assert {"one": [conn]} == pool.connections
def test_reuse_connection(pool):
with pool.connection("one"):
pass
assert pool.connections["one"][0].available
with pool.connection("one") as conn:
assert conn is pool.connections["one"][0]
assert not conn.available
assert pool.connections["one"][0].available
def test_create_second_connection_if_first_is_not_available(pool):
with pool.connection("one"):
pass
connections = pool.connections["one"]
connections[0].available = False
assert 1 == len(connections)
with pool.connection("one") as conn:
# Should be two connections now, both unavailable
assert 2 == len(connections)
assert all([not c.available] for c in connections)
assert conn in connections
# Should still be two connections, both available
assert 2 == len(connections)
assert all([c.available] for c in connections)
def test_block_until_connection_is_released(pool):
# If we have hit max connections already, block until
# a collection is released by another thread
pool.max_connections_per_database = 1
def block_connection(pool):
with pool.connection("one"):
time.sleep(0.2)
t = threading.Thread(target=block_connection, args=[pool])
assert {} == pool.connections
t.start()
# Give thread time to grab the connection:
time.sleep(0.05)
# Thread should now have grabbed and reserved connection:
assert 1 == len(pool.connections["one"])
assert not pool.connections["one"][0].available
start = time.time()
# Now we attempt to use the connection. This should block.
with pool.connection("one"):
assert 1 == len(pool.connections["one"])
assert not pool.connections["one"][0].available
# This should have taken more than 0.1 seconds
end = time.time()
assert (end - start) > 0.1
# Ensure thread has run to completion before ending test:
t.join()
# Connection should be available at the end
assert pool.connections["one"][0].available
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment