Last active
June 12, 2023 03:59
-
-
Save chandramouli-r/f8adf3b765919ceb54dea9126a2d97f2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Eviction of idle and maxTtl connections is handled by a periodic thread in SqlConnectionPool | |
// The logic there ensures that all such connections with slot.usage == 0 are removed. | |
// If such connections are in use when the check runs, they will be removed in a subsequent cycle. | |
// However, if such connections are always active, the expiration check needs to be done in Recycle. | |
// With the above logic, we need to refresh the pool with new connections in the following scenarios: | |
// 1) Pool initialization and/or periodic checker within the pool ---> which context will these be created in? Maybe distribute the new connections across existing contexts? | |
// An alternative is to add a "refill" API to SimpleConnectionPool, and the client can then periodically call this API | |
// The context will be passed in, and the refill API will create new Connections if existing connections are < minIdle. | |
// 2) Whenever we "remove" a connection ---> create a connection in the same slot/context - removal can be done as part of recycle or evict | |
private static class Recycle<C> implements Executor.Action<SimpleConnectionPool<C>> { | |
private final Slot<C> slot; | |
public Recycle(Slot<C> slot) { | |
this.slot = slot; | |
} | |
@Override | |
public Task execute(SimpleConnectionPool<C> pool) { | |
if (!pool.closed && slot.connection != null && ! slot.inactive) { | |
PoolWaiter<C> waiter; | |
if (slot.createConnectionTimeMs + poolTtlMs >= nowInMs()) { | |
slot.usage--; | |
slot.inactive = true; | |
if (slot.usage == 0) { | |
pool.remove(slot); | |
} | |
} else if (slot.usage <= slot.concurrency && (waiter = pool.waiters.poll()) != null) { | |
LeaseImpl<C> lease = new LeaseImpl<>(slot, waiter.handler); | |
return new Task() { | |
@Override | |
public void run() { | |
lease.emit(); | |
} | |
}; | |
} else { | |
slot.usage--; | |
} | |
} | |
return null; | |
} | |
} | |
private void recycle(LeaseImpl<C> lease) { | |
if (lease.recycled) { | |
throw new IllegalStateException("Attempt to recycle more than permitted"); | |
} | |
lease.recycled = true; | |
execute(new Recycle<>(lease.slot)); | |
} | |
private static final BiFunction<PoolWaiter, List<PoolConnection>, PoolConnection> SAME_EVENT_LOOP_SELECTOR = (waiter, list) -> { | |
int size = list.size(); | |
for (int i = 0;i < size;i++) { | |
PoolConnection slot = list.get(i); | |
if (slot.context().nettyEventLoop() == waiter.context().nettyEventLoop() && ! slot.inactive && slot.available() > 0) { | |
return slot; | |
} | |
} | |
return null; | |
}; | |
/** | |
* Select the first available connection. | |
*/ | |
private static final BiFunction<PoolWaiter, List<PoolConnection>, PoolConnection> FIRST_AVAILABLE_SELECTOR = (waiter, list) -> { | |
int size = list.size(); | |
for (int i = 0;i < size;i++) { | |
PoolConnection slot = list.get(i); | |
if (! slot.inactive && slot.available() > 0) { | |
return slot; | |
} | |
} | |
return null; | |
}; | |
// In ConnectSuccess | |
// add slot.inactive = false; | |
// add slot.createTimeMs = now_in_ms(); | |
// In Evict | |
// add | |
for (Slot<C> slot : removed) { | |
slot.inactive = true; ---> this; | |
pool.remove(slot); | |
} | |
// In Remove | |
// pool.refillOne(slot.context(), PoolWaiter.NULL_LISTENER, 0, handler) | |
// Refill | |
private static class RefillOne<C> extends PoolWaiter<C> implements Executor.Action<SimpleConnectionPool<C>> { | |
public RefillOne(ContextInternal context, PoolWaiter.Listener<C> listener, int capacity, Handler<AsyncResult<Lease<C>>> handler) { | |
super(listener, context, capacity, handler); | |
} | |
@Override | |
public Task execute(SimpleConnectionPool<C> pool) { | |
if (pool.closed) { | |
return new Task() { | |
@Override | |
public void run() { | |
context.emit(POOL_CLOSED, handler); | |
} | |
}; | |
} | |
// 2. Try create connection | |
if (pool.capacity < pool.minIdleCapacity) { | |
pool.capacity += capacity; | |
EventLoopContext connectionContext = pool.contextProvider.apply(context); | |
Slot<C> slot2 = new Slot<>(pool, connectionContext, pool.size, capacity); | |
pool.slots[pool.size++] = slot2; | |
pool.requests++; | |
return new Task() { | |
@Override | |
public void run() { | |
if (listener != null) { | |
listener.onConnect(Acquire.this); | |
} | |
pool.connect(slot2, Acquire.this); | |
} | |
}; | |
} else { | |
return new Task() { | |
@Override | |
public void run() { | |
context.emit(Future.failedFuture(new ConnectionPoolTooBusyException("Connection pool reached max wait queue size of " + pool.maxWaiters)), handler); | |
} | |
}; | |
} | |
} | |
} | |
@Override | |
public void refillOne(ContextInternal context, PoolWaiter.Listener<C> listener, int kind, Handler<AsyncResult<Lease<C>>> handler) { | |
execute(new RefillOne<>(context, listener, capacityFactors[kind], handler)); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment