Skip to content

Instantly share code, notes, and snippets.

@peterjurkovic
Last active May 4, 2018 12:43
Show Gist options
  • Save peterjurkovic/9bf083fa786f06db7e97b7ee262f51c0 to your computer and use it in GitHub Desktop.
Save peterjurkovic/9bf083fa786f06db7e97b7ee262f51c0 to your computer and use it in GitHub Desktop.
package com.nexmo.chatapp.connector.api;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.nexmo.chatapp.connector.config.MonitoringServiceKeys;
import com.nexmo.chatapp.connector.config.ThreadPoolConfig;
import com.nexmo.common.server.MonitoringService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.runAsync;
public class ThreadPoolFactory {
private final static Logger Log = LogManager.getLogger();
private final Map<String, ThreadPool> registry = new ConcurrentHashMap<>();
public ThreadPoolExecutor getThreadPool(String name, ThreadPoolConfig config) {
return registry.computeIfAbsent(name.toUpperCase(), k -> new ThreadPool(k, config)).get();
}
public void shutdown() {
Collection<ThreadPool> thradPools = registry.values();
CountDownLatch latch = new CountDownLatch(thradPools.size());
thradPools.forEach( pool -> runAsync( () -> pool.shutdown(latch) ));
try {
latch.await();
} catch (InterruptedException e) {
Log.warn("Failed to shutdown threadpools", e);
}
}
/**
* This can be called in the end of the Server class
*/
public void collectMetrics(){
registry.values().forEach( threadPool -> {
MonitoringService.createThreadPoolSizeGauge(threadPool.monitoringKey(), threadPool.get());
});
}
private static class ThreadPool {
private final ThreadPoolExecutor pool;
private final ThreadPoolConfig config;
private final String name;
public ThreadPool(String name, ThreadPoolConfig config) {
this.name = requireNonNull(name);
this.config = requireNonNull(config);
this.pool = new ThreadPoolExecutor(config.minThread(),
config.maxThread(),
config.keepAliveThreadMs(),
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(config.queueSize()),
new ThreadFactoryBuilder().setNameFormat(name + "-%d").build());
}
void shutdown(CountDownLatch latch) {
try {
Log.error("Shutting down threadpool [ {} ] timeout [ {} ] ms ", this.name, config.shutdownAwaitingTimeMs());
pool.shutdown();
pool.awaitTermination(config.shutdownAwaitingTimeMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Log.error("Error while shutting down threadpool [ {} ] ", this.name, e);
pool.shutdownNow();
}finally {
latch.countDown();
}
}
ThreadPoolExecutor get() {
return this.pool;
}
public String monitoringKey(){
return "chatapp_" + this.name.toLowerCase().replace("-","_") + "_threadpool";
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ThreadPool that = (ThreadPool) o;
return Objects.equals(name, that.name);
}
@Override
public int hashCode() {
return Objects.hash(name);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment