Connection Pools are an important building block for networked applications. Wikipedia describes them very well:
In software engineering, a connection pool is a cache of database connections maintained so that the connections can be reused when future requests to the database are required. Connection pools are used to enhance the performance of executing commands on a database. Opening and maintaining a database connection for each user, especially requests made to a dynamic database-driven website application, is costly and wastes resources. In connection pooling, after a connection is created, it is placed in the pool and it is used again so that a new connection does not have to be established. If all the connections are being used, a new connection is made and is added to the pool. Connection pooling also cuts down on the amount of time a user must wait to establish a connection to the database.
Building an abstract connection pool implementation has been a long standing goal of the swift on server community. There was an explicit pitch post about Generic Connection Pool in 2020. The last SSWG annual update called it out explicitly again.
Community members have also requested connection pool advice at times.
While building a simple connection pool is relatively straight forward, there are a ton of potential gotchas in more advanced (full-featured) connection pools.
The ecosystem has some connection pool implementations around:
Nearly all of these implementations heavily depend on SwiftNIO and use EventLoopFutures and -Promises to signal async completion events. With the ecosystem moving away from EventLoopFutures and embracing structured concurrency, we have an opportunity to build out an abstract connection pool that has the chance of being widely adopted in the ecosystem.
Looking at the above implementations we can see common patterns that can help us identify our design goals:
Our current design goals are:
- All concurrent code shall use structured concurrency (zero unstructured tasks)
- Except for when there are clear performance benefits using closure based callbacks
- The connection pool shall support different connection types:
- persisted connections (Connections that are established even if there are no request for a connection)
- dropped-when-idle connections (Connections that are established when there is demand for connections and that are closed after some idle timeout duration.)
- overflow connections (Connections that are established when there is demand for connections and that are closed immediately after demand has been fulfilled.)
- Support for multiplexed connections
- The implementation shall be useful to all Swift developers and shall not be focused solely on the Swift on server use-case.
- Users shall be able to get metrics from inside the connection pool.
- The connection pool shall support keep-alive behaviors
- The connection pool shall be an implementation detail in client libraries. Application developers shall not directly interact with it. Client library authors shall use the generic ConnectionPool to implement their
*Clients.
As stated in previous connection pool discussions: We expect that there are connection pool use-cases that we can not provide a solution for with a generic approach, as those use-cases might be hard to abstract over. An example for this is AsyncHTTPClient which supports HTTP/1 connections and HTTP/2 connections at the same time and can switch at runtime between HTTP versions.
We want to provide a Swift package that offers an abstract Connection Pool.
Instead of creating a separate package for the Connection Pool from the get go, we want to land the Connection Pool inside its own module (ConnectionPoolModule) into the Postgres package PostgresNIO.
This has the advantage that PostgresNIO can adopt the new ConnectionPool without depending on a pre 1.0 dependency. This way we can gain confidence in the implementation while it can be used in Postgres in production and move it into its own repo whenever we think it is ready.
Library authors of other libraries can still build proof of concepts for their libraries by depending on PostgresNIO.
To build an abstract Connection Pool we need to use a configuration object and protocols to allow modifying the pools behavior.
In this document we first introduce all helper types before we put them all together in the Connection Pool.
We need a protocol to abstract over the connections that the ConnectionPool shall maintain for us.
/// A connection that can be pooled in a ``ConnectionPool``
public protocol PooledConnection: AnyObject, Sendable {
/// The connections identifier type.
associatedtype ID: Hashable & Sendable
/// The connections identifier. The identifier is passed to
/// the connection factory method and must stay attached to
/// the connection at all times. It must not change during
/// the connections lifetime.
var id: ID { get }
/// A method to register closures that are invoked when the
/// connection is closed. If the connection closed unexpectedly
/// the closure shall be called with the underlying error.
/// In most NIO clients this can be easily implemented by
/// attaching to the `channel.closeFuture`:
/// ```
/// func onClose(
/// _ closure: @escaping @Sendable ((any Error)?) -> ()
/// ) {
/// channel.closeFuture.whenComplete { _ in
/// closure(previousError)
/// }
/// }
/// ```
func onClose(_ closure: @escaping @Sendable ((any Error)?) -> ())
/// Close the running connection. Once the close has completed
/// closures that were registered in `onClose` must be
/// invoked.
func close()
}We need a type that generates the Connection IDs:
/// A connection id generator. Its returned connection IDs will
/// be used when creating new ``PooledConnection``s
public protocol ConnectionIDGeneratorProtocol: Sendable {
/// The connections identifier type.
associatedtype ID: Hashable & Sendable
/// The next connection ID that shall be used.
func next() -> ID
}The ConnectionPoolModule provides a default ConnectionIDGenerator that uses an internal atomic to generate Int connection ids that start at 0.
We want to allow users to specify keep-alive behaviors. This
could be sending a PING request in http/2 or sending a SELECT 1 for database drivers.
/// A keep alive behavior for connections maintained by the pool
public protocol ConnectionKeepAliveBehavior: Sendable {
/// the connection type
associatedtype Connection: PooledConnection
/// The time after which a keep-alive shall
/// be triggered.
/// If nil is returned, a keep-alive is deactivated
var keepAliveFrequency: Duration? { get }
/// This method is invoked when the keep-alive shall be
/// run.
func runKeepAlive(for connection: Connection) async throws
}The ConnectionPoolModule provides a default NoOpKeepAliveBehavior that doesn't run any keep-alives.
The Connection Pool shall not depend on the packages swift-metrics, swift-log or swift-distributed-tracing. The reason for this is, that we want the ConnectionPool implementation to be useful to all Swift developers, including developers for Apple platforms. The above observability libraries are widely used in the Swift on server ecosystem, but are seldom used on Apple platforms as there are alternatives that integrate deeper with the underlying platform (such as os_log). Because of this, we need to inform users about the internal state of the connection pool. We want to use the delegate pattern for this:
/// A connection metric delegate
protocol ConnectionPoolObservabilityDelegate: Sendable {
associatedtype ConnectionID: Hashable & Sendable
/// A connection attempt for a connection with the provided
/// ID has started. The connection attempt result will be
/// reported as either ``connectSucceeded(id:streamCapacity:)``
/// or ``connectFailed(id:error:)``.
func startedConnecting(id: ConnectionID)
/// A connection attempt failed with the given error.
/// After some period of time ``startedConnecting(id:)`` may
/// be called again.
func connectFailed(id: ConnectionID, error: any Error)
/// A connection was established for the provided ID.
/// `streamCapacity` streams are available to use on the
/// connection. The maximum number of available streams may
/// change over time and is reported via `streamCapacity`.
func connectSucceeded(id: ConnectionID, streamCapacity: UInt16)
}The above list of callbacks is currently incomplete and will be extended as we make more progress in the implementation. The goal is to have a view into the connection pool as gRPC provides for its users today.
To lease a connection from the pool, users need to implement the ConnectionRequestProtocol. The reason for this is that the ConnectionPool base API uses unstructured concurrency to lease and release a connection. See ConnectionPool for an in depth discussion why that is the case.
/// A request to get a connection from the `ConnectionPool`
public protocol ConnectionRequestProtocol: Sendable {
/// A connection lease request ID type.
associatedtype ID: Hashable & Sendable
/// The leased connection type
associatedtype Connection: PooledConnection
/// A connection lease request ID. This ID must be generated
/// by users of the `ConnectionPool` outside the
/// `ConnectionPool`. It is not generated inside the pool like
/// the `ConnectionID`s. The lease request ID must be unique
/// and must not change, if your implementing type is a
/// reference type.
var id: ID { get }
/// A function that is called with a connection or a
/// `PoolError`.
func complete(with: Result<Connection, PoolError>)
}We explain why we opted for this unstructured concurrency here in Reasons for the low-level unstructured concurrency API.
We further need a method that establishes new connections for the connection pool. This is a closure that users provide on ConnectionPool initialization. The input parameters are the connection id as well as a reference to the ConnectionPool. We explicitly pass the ConnectionPool, so that Connections can create references to the ConnectionPool to inform them about changes in the number of streams they can handle, or to inform the pool that they have room for further requests. Users must return a ConnectionAndMetadata object from the factory. The ConnectionAndMetadata holds the newly created connection and the number of available streams on the new connection.
public typealias ConnectionFactory = @Sendable (ConnectionID, ConnectionPool) async throws -> ConnectionAndMetadata<Connection>
public struct ConnectionAndMetadata<Connection: PooledConnection> {
public var connection: Connection
public var maximumStreamsOnConnection: UInt16
public init(
connection: Connection,
maximumStreamsOnConnection: UInt16 = 1
)
}public struct ConnectionPoolConfiguration {
/// The minimum number of connections to preserve in the pool.
///
/// If the pool is mostly idle and the remote servers closes
/// idle connections,
/// the `ConnectionPool` will initiate new outbound
/// connections proactively to avoid the number of available
/// connections dropping below this number.
public var minimumConnectionCount: Int
/// Between the `minimumConnectionCount` and
/// `maximumConnectionSoftLimit` the connection pool creates
/// _preserved_ connections. Preserved connections are closed
/// if they have been idle for ``idleTimeout``.
public var maximumConnectionSoftLimit: Int
/// The maximum number of connections for this pool, that can
/// exist at any point in time. The pool can create _overflow_
/// connections, if all connections are leased, and the
/// `maximumConnectionHardLimit` > `maximumConnectionSoftLimit `
/// Overflow connections are closed immediately as soon as they
/// become idle.
public var maximumConnectionHardLimit: Int
/// The time that a _preserved_ idle connection stays in the
/// pool before it is closed.
public var idleTimeout: Duration
/// initializer
public init()
}The connection pool interface shall be the following:
/// A connection pool
public final class ConnectionPool<
Connection: PooledConnection,
ConnectionID: Hashable & Sendable,
ConnectionIDGenerator: ConnectionIDGeneratorProtocol,
Request: ConnectionRequestProtocol,
RequestID: Hashable & Sendable,
KeepAliveBehavior: ConnectionKeepAliveBehavior,
ObservabilityDelegate: ConnectionPoolObservabilityDelegate,
Clock: _Concurrency.Clock
>: @unchecked Sendable where
Connection.ID == ConnectionID,
ConnectionIDGenerator.ID == ConnectionID,
Request.Connection == Connection,
Request.ID == RequestID,
KeepAliveBehavior.Connection == Connection,
ObservabilityDelegate.ConnectionID == ConnectionID,
Clock.Duration == Duration
{
public typealias ConnectionFactory = @Sendable (ConnectionID, ConnectionPool<Connection, ConnectionID, ConnectionIDGenerator, Request, RequestID, KeepAliveBehavior, ObservabilityDelegate, Clock>) async throws -> ConnectionAndMetadata<Connection>
/// Creates connection pool.
public init(
configuration: ConnectionPoolConfiguration,
idGenerator: ConnectionIDGenerator,
requestType: Request.Type,
keepAliveBehavior: KeepAliveBehavior,
observabilityDelegate: ObservabilityDelegate,
clock: Clock,
connectionFactory: @escaping ConnectionFactory
)
/// Lease a connection for a provided request. If a connection
/// becomes available the requests `complete` function will be
/// invoked.
/// If your pool manages multiplexed connections this request
/// matches a request to single stream.
public func leaseConnection(_ request: Request)
/// Lease a connection for a sequence of request. Once a
/// connections become available each request will receive a
/// call to its `complete` function.
/// If your pool manages multiplexed connections each request
/// gets access to a single stream.
public func leaseConnections(
for requests: some Sequence<Request>
)
/// Cancel a connection lease request by its ``RequestID``.
/// If the request is waiting for a connection in the pool,
/// the request is cancelled and will receive a call to its
/// complete method with a result failure and a cancellation
/// error.
/// If a request has already been succeeded with a connection,
/// the pool can't cancel it anymore. It is up to the user to
/// cancel the already running request.
public func cancelLeaseConnection(_ requestID: RequestID)
/// Return a connection to the pool. If your pool manages
/// multiplexed connections this is equivalent to returning
/// a single stream to the pool.
/// Users must call this once they are done using the leased
/// connection/stream.
public func releaseConnection(
_ connection: Connection,
streams: UInt16 = 1
)
/// Mark a connection as going away. Connection implementors
/// should call this method if the connection has received
/// a close intent from the server. For example: an HTTP/2
/// GOWAY frame.
///
/// For this connection implementors have to preserve a
/// reference to the pool in the connection. They should hold
/// on to the reference that is passed in the
/// `ConnectionFactory`.
public func connectionWillClose(_ connection: Connection)
/// If the max stream setting has changed for a multiplexed
/// connection, the new setting must be communicated to the
/// pool.
///
/// For this connection implementors have to preserve a
/// reference to the pool in the connection. They should hold
/// on to the reference that is passed in the
/// `ConnectionFactory`.
public func connection(
_ connection: Connection,
didReceiveNewMaxStreamSetting: UInt16
)
/// A run method, which is used for all pool background tasks.
/// This includes connection creation, running keep-alives and
/// running timers.
/// It is not possible to lease a connection from the pool
/// without calling run on it.
/// To close a ``ConnectionPool``, cancel the task that is
/// running the run method.
public func run() async
}While most APIs in the new ConnectionPool use structured concurrency – for example creating connections or running a keep-alive – the lease and release functions are unstructured.
To allow easier adoption, we offer an implementation for the ConnectionRequestProtocol called ConnectionRequest that allows users to embrace structured concurrency everywhere:
struct ConnectionRequest<Connection: PooledConnection>: ConnectionRequestProtocol {
public typealias ID = Int
public func complete(
with result: Result<Connection, PoolError>
)
}
extension ConnectionPool where Request == ConnectionRequest<Connection> {
public func leaseConnection() async throws -> Connection
public func withConnection<Result>(
_ closure: (Connection) async throws -> Result
) async throws -> Result
}The ConnectionPool base API does not offer an asynchronous lease function and a scoped withConnection function as shown above in the Request == ConnectionRequest case. The reason for this is performance. We want adopters to be able to rely on the ConnectionPool even in high throughput scenarios. Structured concurrency might introduce thread hops or unnecessary waiting – even once Task executors have landed – that we can prevent by using unstructured concurrency.
Let's look at two examples:
Let's look at what happens, if all connections are busy executing and more work is scheduled on the pool:
Task that leases:
- call to
leaseConnection() - there is currently no connection
- store continuation in the pool
- resume once connection becomes available
- Once connection is available jump into the connections thread/task
Connection thread:
- Something is running on the connection
- The last item for the job was received
- The last item is mirrored into structured concurrency -> this means a thread hop into the concurrent executor
- The connection is released in the concurrent executor -> this triggers 3 above to resume
This means: To schedule a single work item we need at least two thread hops.
Task that leases:
- call to
leaseConnection() - there is currently no connection
- store request in the pool
- once a connection becomes available the requests complete method is called directly in the connections thread.
Connection thread:
- Something is running on the connection
- The last item for the job was received
- Once the last item for a job was released the connection can autorelease itself.
- The connection is released in the connection thread. More work can be scheduled on the same thread. -> this triggers 3 above to resume
This means: To schedule a work item we don't need any thread hops. We might have an implicit thread hop depending on where the request was enqueued in the pool.
Especially in scenarios where lots of work is enqueued very fast (example: Sending 1 million push notifications), the performance of the system can be constrained by the time it takes to schedule new work on the limited connections.
Database connection are shared objects that are expensive to create. Also, databases will only allow a certain number of open connections at a given moment.
If we embrace structured concurrency the return of a connection might be delayed because of slow stream consumers:
try await pool.withConnection { connection in
for try await row in connection.query("SELECT generte_series(1, 1000)") {
// slow consumption
}
}A Postgres connection often receives lots of rows at once from the server. It is not unlikely that the Postgres client might receive the 1000 rows above in a single network read. However, due to structured concurrency, after the Postgres connection has received the end message for the above query it isn't immediately returned back to the pool, instead it has to wait until the user has completed their potentially slow iteration over the returned rows.
Offering a lower level unstructured API allows client developers to create APIs that are fully structured for their users, but use unstructured concurrency internally to hand back their connections to the pool as soon as possible:
for try await row in pool.query("SELECT generte_series(1, 1000)") {
// slow consumption
}We considered using structured concurrency everywhere. However, we decided against it for the performance reasons outlined in Reasons for the low-level unstructured concurrency API.
We considered adding timeouts to the lease connection function. Once the deadline would be hit the lease request would be failed automatically. We decided against this as we think that the Swift stdlib ideally should provide an API that uses Task cancellation to enforce timeouts/deadlines everywhere.