Created
July 9, 2017 06:35
-
-
Save anandrajneesh/0de1c1bbb5f7ee4bd1e700378685dcf2 to your computer and use it in GitHub Desktop.
Connection pooling
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.gluecoders.multithreading.cp; | |
import java.util.stream.IntStream; | |
public class Driver { | |
public static void main(String[] args) { | |
try { | |
Thread.sleep(20000); | |
int max = (int) (Math.random()*100); | |
//max = no of connections | |
final Pool<FakeConnection> pool = Pool.of(FakeConnection::new, max); | |
//threadCount = no of threads running simultaneously for acquiring connection | |
int threadCount = 2 * max; | |
System.out.println("Creating threads "+ threadCount); | |
//Create Runnable instances and start them in threads | |
IntStream.rangeClosed(1, threadCount) | |
.mapToObj(i -> (Runnable) () -> { | |
long time = System.currentTimeMillis(); | |
try { | |
FakeConnection conn = pool.acquire(); | |
System.out.printf("Connection acquired by %s after %s secs %n", Thread.currentThread().getName(), (System.currentTimeMillis() - time)/1000); | |
time = System.currentTimeMillis(); | |
conn.work(); | |
System.out.printf("Work done by %s time taken %s secs %n", Thread.currentThread().getName(), (System.currentTimeMillis() - time)/1000); | |
pool.release(conn); | |
} catch (ResourceException e) { | |
e.printStackTrace(); | |
} | |
}) | |
.map(Thread::new) | |
.forEach(Thread::start); | |
//Customary sleep just for Thread Analysers | |
Thread.sleep(100000L); | |
}catch (Exception e){ | |
e.printStackTrace(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.gluecoders.multithreading.cp; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import java.util.concurrent.Semaphore; | |
import java.util.function.Supplier; | |
import java.util.stream.IntStream; | |
public class Pool<T> { | |
private Semaphore access; | |
private ConcurrentLinkedQueue<T> resources; | |
/** | |
* Initializes semaphore and resources with Supplier.get() provided | |
* @param supplier - Suppiler for connection or whatever pool needs to maintain | |
* @param maxAllowed - max instances allowed at a time | |
*/ | |
private Pool(Supplier<T> supplier, int maxAllowed) { | |
System.out.println(String.format("Initiating Pool of %s with maxAllowed %s", supplier, maxAllowed)); | |
access = new Semaphore(maxAllowed); | |
resources = IntStream.rangeClosed(1, maxAllowed) | |
.mapToObj(i -> supplier.get()) | |
.collect(ConcurrentLinkedQueue::new, ConcurrentLinkedQueue::add, ConcurrentLinkedQueue::addAll); | |
} | |
public static <T> Pool<T> of(Supplier<T> supplier, int maxAllowed) { | |
return new Pool<>(supplier, maxAllowed); | |
} | |
/** | |
* Acquire resource from the pool. Blocks if no resource is available. | |
* @return T resource will never be null | |
* @throws ResourceException | |
*/ | |
public T acquire() throws ResourceException { | |
try { | |
System.out.println("Trying to acquire resource for "+Thread.currentThread().getName()); | |
//Blocking if no permit is available, blocks until some thread has called release() | |
access.acquire(); | |
return resources.poll(); | |
} catch (InterruptedException e) { | |
throw new ResourceException("Could not acquire resource from Semaphore", e); | |
} | |
} | |
/** | |
* Release the resource back to pool, permit is added back to semaphore | |
* @param resource | |
*/ | |
public void release(T resource) { | |
System.out.println("Releasing resource from "+Thread.currentThread().getName()); | |
resources.offer(resource); | |
access.release(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment