Skip to content

Instantly share code, notes, and snippets.

@chandramouli-r
Last active June 12, 2023 03:59
Show Gist options
  • Save chandramouli-r/f8adf3b765919ceb54dea9126a2d97f2 to your computer and use it in GitHub Desktop.
Save chandramouli-r/f8adf3b765919ceb54dea9126a2d97f2 to your computer and use it in GitHub Desktop.
// Eviction of idle and maxTtl connections is handled by a periodic thread in SqlConnectionPool
// The logic there ensures that all such connections with slot.usage == 0 are removed.
// If such connections are in use when the check runs, they will be removed in a subsequent cycle.
// However, if such connections are always active, the expiration check needs to be done in Recycle.
// With the above logic, we need to refresh the pool with new connections in the following scenarios:
// 1) Pool initialization and/or periodic checker within the pool ---> which context will these be created in? Maybe distribute the new connections across existing contexts?
// An alternative is to add a "refill" API to SimpleConnectionPool, and the client can then periodically call this API
// The context will be passed in, and the refill API will create new Connections if existing connections are < minIdle.
// 2) Whenever we "remove" a connection ---> create a connection in the same slot/context - removal can be done as part of recycle or evict
private static class Recycle<C> implements Executor.Action<SimpleConnectionPool<C>> {
private final Slot<C> slot;
public Recycle(Slot<C> slot) {
this.slot = slot;
}
@Override
public Task execute(SimpleConnectionPool<C> pool) {
if (!pool.closed && slot.connection != null && ! slot.inactive) {
PoolWaiter<C> waiter;
if (slot.createConnectionTimeMs + poolTtlMs >= nowInMs()) {
slot.usage--;
slot.inactive = true;
if (slot.usage == 0) {
pool.remove(slot);
}
} else if (slot.usage <= slot.concurrency && (waiter = pool.waiters.poll()) != null) {
LeaseImpl<C> lease = new LeaseImpl<>(slot, waiter.handler);
return new Task() {
@Override
public void run() {
lease.emit();
}
};
} else {
slot.usage--;
}
}
return null;
}
}
private void recycle(LeaseImpl<C> lease) {
if (lease.recycled) {
throw new IllegalStateException("Attempt to recycle more than permitted");
}
lease.recycled = true;
execute(new Recycle<>(lease.slot));
}
private static final BiFunction<PoolWaiter, List<PoolConnection>, PoolConnection> SAME_EVENT_LOOP_SELECTOR = (waiter, list) -> {
int size = list.size();
for (int i = 0;i < size;i++) {
PoolConnection slot = list.get(i);
if (slot.context().nettyEventLoop() == waiter.context().nettyEventLoop() && ! slot.inactive && slot.available() > 0) {
return slot;
}
}
return null;
};
/**
* Select the first available connection.
*/
private static final BiFunction<PoolWaiter, List<PoolConnection>, PoolConnection> FIRST_AVAILABLE_SELECTOR = (waiter, list) -> {
int size = list.size();
for (int i = 0;i < size;i++) {
PoolConnection slot = list.get(i);
if (! slot.inactive && slot.available() > 0) {
return slot;
}
}
return null;
};
// In ConnectSuccess
// add slot.inactive = false;
// add slot.createTimeMs = now_in_ms();
// In Evict
// add
for (Slot<C> slot : removed) {
slot.inactive = true; ---> this;
pool.remove(slot);
}
// In Remove
// pool.refillOne(slot.context(), PoolWaiter.NULL_LISTENER, 0, handler)
// Refill
private static class RefillOne<C> extends PoolWaiter<C> implements Executor.Action<SimpleConnectionPool<C>> {
public RefillOne(ContextInternal context, PoolWaiter.Listener<C> listener, int capacity, Handler<AsyncResult<Lease<C>>> handler) {
super(listener, context, capacity, handler);
}
@Override
public Task execute(SimpleConnectionPool<C> pool) {
if (pool.closed) {
return new Task() {
@Override
public void run() {
context.emit(POOL_CLOSED, handler);
}
};
}
// 2. Try create connection
if (pool.capacity < pool.minIdleCapacity) {
pool.capacity += capacity;
EventLoopContext connectionContext = pool.contextProvider.apply(context);
Slot<C> slot2 = new Slot<>(pool, connectionContext, pool.size, capacity);
pool.slots[pool.size++] = slot2;
pool.requests++;
return new Task() {
@Override
public void run() {
if (listener != null) {
listener.onConnect(Acquire.this);
}
pool.connect(slot2, Acquire.this);
}
};
} else {
return new Task() {
@Override
public void run() {
context.emit(Future.failedFuture(new ConnectionPoolTooBusyException("Connection pool reached max wait queue size of " + pool.maxWaiters)), handler);
}
};
}
}
}
@Override
public void refillOne(ContextInternal context, PoolWaiter.Listener<C> listener, int kind, Handler<AsyncResult<Lease<C>>> handler) {
execute(new RefillOne<>(context, listener, capacityFactors[kind], handler));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment