Skip to content

Instantly share code, notes, and snippets.

@jarek-przygodzki
Last active April 20, 2021 15:00
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jarek-przygodzki/7991992 to your computer and use it in GitHub Desktop.
Save jarek-przygodzki/7991992 to your computer and use it in GitHub Desktop.
Quartz thread pool replacement based on ThreadPoolExecutor
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.quartz.SchedulerConfigException;
import org.quartz.spi.ThreadPool;
/**
* Implementacja {@link org.quartz.spi.ThreadPool} wykorzystująca
* {@link java.util.concurrent.ThreadPoolExecutor}
*
*/
public class ExecutorThreadPool implements ThreadPool {
/**
* Domyślna okres bezczynności po której bezczynny wątek jest zwalniany
*/
private static final long DEFAULT_KEEP_ALIVE_SECONDS = 10;
/*
* Konfiguracja puli wątków
*/
private int threadCount;
/**
* Okres bezczynności w sekundach po której bezczynny wątek jest zwalniany
*/
private long keepAliveTime = DEFAULT_KEEP_ALIVE_SECONDS;
/**
* Priorytet wątku
*/
private int threadPriority = Thread.NORM_PRIORITY;
private ThreadPoolExecutor executor;
private final Object threadAvailableLock = new Object();
/**
* Liczba zadań zgłoszonych do wykonania
*/
private long submittedCount;
/**
* Liczba zadań wykonanych
*/
private long completedCount;
public void setThreadCount(int count) {
this.threadCount = count;
}
public void setKeepAliveTime(long keepAliveTime) {
this.keepAliveTime = keepAliveTime;
}
public void setThreadPriority(int prio) {
this.threadPriority = prio;
}
public void setThreadsInheritContextClassLoaderOfInitializingThread(
boolean threadsInheritContextClassLoaderOfInitializingThread) {
}
/**
* {@inheritDoc}
*/
@Override
public int blockForAvailableThreads() {
final ThreadPoolExecutor threadPoolExecutor = this.executor;
final Object threadAvailableLock = this.threadAvailableLock;
synchronized (threadAvailableLock) {
/*
* Rzutowanie bezpieczne bo submittedTaskCount - completedTaskCount
* <=threadCount
*/
int activeCount = (int) (submittedCount - completedCount);
int availableThreads = threadCount - activeCount;
boolean shutdown = threadPoolExecutor.isShutdown();
while (availableThreads < 1 && !shutdown) {
try {
threadAvailableLock.wait();
/*
* Nie jest to opisane w specyfikacji metody
* blockForAvailableThreads() ale gdy wątek zablokowany na
* jej wywołaniu zostanie wybudzony w wyniku wywołania
* metody shutdown() nie należy zwracać dodatniej liczny
* wątków - patrz org.quartz.simpl.SimpleThreadPool
*/
if (shutdown) {
return -1;
}
activeCount = (int) (submittedCount - completedCount);
availableThreads = threadCount - activeCount;
shutdown = threadPoolExecutor.isShutdown();
} catch (InterruptedException ignore) {
return -1;
}
}
return availableThreads;
}
}
private class ThreadExecutorService extends ThreadPoolExecutor {
public ThreadExecutorService(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
notifyThreadAvailableLock(/* shutdown */false);
}
@Override
public void shutdown() {
try {
super.shutdown();
} finally {
notifyThreadAvailableLock(/* shutdown */true);
}
}
@Override
public List<Runnable> shutdownNow() {
try {
return super.shutdownNow();
} finally {
notifyThreadAvailableLock(/* shutdown */true);
}
}
private void notifyThreadAvailableLock(boolean shutdown) {
final Object threadAvailableLock = ExecutorThreadPool.this.threadAvailableLock;
synchronized (threadAvailableLock) {
if (!shutdown) {
completedCount += 1;
}
threadAvailableLock.notifyAll();
}
}
}
/**
* Metoda wywoływana po ustawieniu (przez refleksję) parametrów puli wątków
* podczas tworzenia planisty zadań (
* <em>StdSchedulerFactory.instantiate()</em>)
*
* @throws SchedulerConfigException
*/
@Override
public void initialize() throws SchedulerConfigException {
if (threadCount <= 0) {
throw new SchedulerConfigException(
"Liczba wątków w puli musi być > 0");
}
if (threadPriority <= 0 || threadPriority > 9) {
throw new SchedulerConfigException(
"Priorytet wątku musi być > 0 and <= 9");
}
executor = new ThreadExecutorService(threadCount, threadCount,
keepAliveTime, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
Executors.defaultThreadFactory());
executor.allowCoreThreadTimeOut(true);
}
@Override
public boolean runInThread(Runnable runnable) {
if (runnable == null) {
return false;
}
final Object threadAvailableLock = this.threadAvailableLock;
synchronized (threadAvailableLock) {
submittedCount += 1;
}
try {
this.executor.execute(runnable);
return true;
} catch (RejectedExecutionException e) {
synchronized (threadAvailableLock) {
submittedCount -= 1;
}
return false;
}
}
/**
* {@inheritDoc}
*/
@Override
public void shutdown(boolean waitForJobsToComplete) {
if (waitForJobsToComplete) {
executor.shutdown();
} else {
executor.shutdownNow();
}
}
public void setInstanceId(String schedInstanceId) {
}
public void setInstanceName(String schedName) {
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment