Skip to content

Instantly share code, notes, and snippets.

@anandrajneesh
Created July 9, 2017 06:35
Show Gist options
  • Save anandrajneesh/0de1c1bbb5f7ee4bd1e700378685dcf2 to your computer and use it in GitHub Desktop.
Save anandrajneesh/0de1c1bbb5f7ee4bd1e700378685dcf2 to your computer and use it in GitHub Desktop.
Connection pooling
package org.gluecoders.multithreading.cp;
import java.util.stream.IntStream;
public class Driver {
public static void main(String[] args) {
try {
Thread.sleep(20000);
int max = (int) (Math.random()*100);
//max = no of connections
final Pool<FakeConnection> pool = Pool.of(FakeConnection::new, max);
//threadCount = no of threads running simultaneously for acquiring connection
int threadCount = 2 * max;
System.out.println("Creating threads "+ threadCount);
//Create Runnable instances and start them in threads
IntStream.rangeClosed(1, threadCount)
.mapToObj(i -> (Runnable) () -> {
long time = System.currentTimeMillis();
try {
FakeConnection conn = pool.acquire();
System.out.printf("Connection acquired by %s after %s secs %n", Thread.currentThread().getName(), (System.currentTimeMillis() - time)/1000);
time = System.currentTimeMillis();
conn.work();
System.out.printf("Work done by %s time taken %s secs %n", Thread.currentThread().getName(), (System.currentTimeMillis() - time)/1000);
pool.release(conn);
} catch (ResourceException e) {
e.printStackTrace();
}
})
.map(Thread::new)
.forEach(Thread::start);
//Customary sleep just for Thread Analysers
Thread.sleep(100000L);
}catch (Exception e){
e.printStackTrace();
}
}
}
package org.gluecoders.multithreading.cp;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;
import java.util.stream.IntStream;
public class Pool<T> {
private Semaphore access;
private ConcurrentLinkedQueue<T> resources;
/**
* Initializes semaphore and resources with Supplier.get() provided
* @param supplier - Suppiler for connection or whatever pool needs to maintain
* @param maxAllowed - max instances allowed at a time
*/
private Pool(Supplier<T> supplier, int maxAllowed) {
System.out.println(String.format("Initiating Pool of %s with maxAllowed %s", supplier, maxAllowed));
access = new Semaphore(maxAllowed);
resources = IntStream.rangeClosed(1, maxAllowed)
.mapToObj(i -> supplier.get())
.collect(ConcurrentLinkedQueue::new, ConcurrentLinkedQueue::add, ConcurrentLinkedQueue::addAll);
}
public static <T> Pool<T> of(Supplier<T> supplier, int maxAllowed) {
return new Pool<>(supplier, maxAllowed);
}
/**
* Acquire resource from the pool. Blocks if no resource is available.
* @return T resource will never be null
* @throws ResourceException
*/
public T acquire() throws ResourceException {
try {
System.out.println("Trying to acquire resource for "+Thread.currentThread().getName());
//Blocking if no permit is available, blocks until some thread has called release()
access.acquire();
return resources.poll();
} catch (InterruptedException e) {
throw new ResourceException("Could not acquire resource from Semaphore", e);
}
}
/**
* Release the resource back to pool, permit is added back to semaphore
* @param resource
*/
public void release(T resource) {
System.out.println("Releasing resource from "+Thread.currentThread().getName());
resources.offer(resource);
access.release();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment