Last active
April 20, 2021 15:00
-
-
Save jarek-przygodzki/7991992 to your computer and use it in GitHub Desktop.
Quartz thread pool replacement based on ThreadPoolExecutor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.List; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.RejectedExecutionException; | |
import java.util.concurrent.ThreadFactory; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
import org.quartz.SchedulerConfigException; | |
import org.quartz.spi.ThreadPool; | |
/** | |
* Implementacja {@link org.quartz.spi.ThreadPool} wykorzystująca | |
* {@link java.util.concurrent.ThreadPoolExecutor} | |
* | |
*/ | |
public class ExecutorThreadPool implements ThreadPool { | |
/** | |
* Domyślna okres bezczynności po której bezczynny wątek jest zwalniany | |
*/ | |
private static final long DEFAULT_KEEP_ALIVE_SECONDS = 10; | |
/* | |
* Konfiguracja puli wątków | |
*/ | |
private int threadCount; | |
/** | |
* Okres bezczynności w sekundach po której bezczynny wątek jest zwalniany | |
*/ | |
private long keepAliveTime = DEFAULT_KEEP_ALIVE_SECONDS; | |
/** | |
* Priorytet wątku | |
*/ | |
private int threadPriority = Thread.NORM_PRIORITY; | |
private ThreadPoolExecutor executor; | |
private final Object threadAvailableLock = new Object(); | |
/** | |
* Liczba zadań zgłoszonych do wykonania | |
*/ | |
private long submittedCount; | |
/** | |
* Liczba zadań wykonanych | |
*/ | |
private long completedCount; | |
public void setThreadCount(int count) { | |
this.threadCount = count; | |
} | |
public void setKeepAliveTime(long keepAliveTime) { | |
this.keepAliveTime = keepAliveTime; | |
} | |
public void setThreadPriority(int prio) { | |
this.threadPriority = prio; | |
} | |
public void setThreadsInheritContextClassLoaderOfInitializingThread( | |
boolean threadsInheritContextClassLoaderOfInitializingThread) { | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public int blockForAvailableThreads() { | |
final ThreadPoolExecutor threadPoolExecutor = this.executor; | |
final Object threadAvailableLock = this.threadAvailableLock; | |
synchronized (threadAvailableLock) { | |
/* | |
* Rzutowanie bezpieczne bo submittedTaskCount - completedTaskCount | |
* <=threadCount | |
*/ | |
int activeCount = (int) (submittedCount - completedCount); | |
int availableThreads = threadCount - activeCount; | |
boolean shutdown = threadPoolExecutor.isShutdown(); | |
while (availableThreads < 1 && !shutdown) { | |
try { | |
threadAvailableLock.wait(); | |
/* | |
* Nie jest to opisane w specyfikacji metody | |
* blockForAvailableThreads() ale gdy wątek zablokowany na | |
* jej wywołaniu zostanie wybudzony w wyniku wywołania | |
* metody shutdown() nie należy zwracać dodatniej liczny | |
* wątków - patrz org.quartz.simpl.SimpleThreadPool | |
*/ | |
if (shutdown) { | |
return -1; | |
} | |
activeCount = (int) (submittedCount - completedCount); | |
availableThreads = threadCount - activeCount; | |
shutdown = threadPoolExecutor.isShutdown(); | |
} catch (InterruptedException ignore) { | |
return -1; | |
} | |
} | |
return availableThreads; | |
} | |
} | |
private class ThreadExecutorService extends ThreadPoolExecutor { | |
public ThreadExecutorService(int corePoolSize, int maximumPoolSize, | |
long keepAliveTime, TimeUnit unit, | |
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { | |
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, | |
workQueue, threadFactory); | |
} | |
@Override | |
protected void afterExecute(Runnable r, Throwable t) { | |
notifyThreadAvailableLock(/* shutdown */false); | |
} | |
@Override | |
public void shutdown() { | |
try { | |
super.shutdown(); | |
} finally { | |
notifyThreadAvailableLock(/* shutdown */true); | |
} | |
} | |
@Override | |
public List<Runnable> shutdownNow() { | |
try { | |
return super.shutdownNow(); | |
} finally { | |
notifyThreadAvailableLock(/* shutdown */true); | |
} | |
} | |
private void notifyThreadAvailableLock(boolean shutdown) { | |
final Object threadAvailableLock = ExecutorThreadPool.this.threadAvailableLock; | |
synchronized (threadAvailableLock) { | |
if (!shutdown) { | |
completedCount += 1; | |
} | |
threadAvailableLock.notifyAll(); | |
} | |
} | |
} | |
/** | |
* Metoda wywoływana po ustawieniu (przez refleksję) parametrów puli wątków | |
* podczas tworzenia planisty zadań ( | |
* <em>StdSchedulerFactory.instantiate()</em>) | |
* | |
* @throws SchedulerConfigException | |
*/ | |
@Override | |
public void initialize() throws SchedulerConfigException { | |
if (threadCount <= 0) { | |
throw new SchedulerConfigException( | |
"Liczba wątków w puli musi być > 0"); | |
} | |
if (threadPriority <= 0 || threadPriority > 9) { | |
throw new SchedulerConfigException( | |
"Priorytet wątku musi być > 0 and <= 9"); | |
} | |
executor = new ThreadExecutorService(threadCount, threadCount, | |
keepAliveTime, TimeUnit.SECONDS, | |
new LinkedBlockingQueue<Runnable>(), | |
Executors.defaultThreadFactory()); | |
executor.allowCoreThreadTimeOut(true); | |
} | |
@Override | |
public boolean runInThread(Runnable runnable) { | |
if (runnable == null) { | |
return false; | |
} | |
final Object threadAvailableLock = this.threadAvailableLock; | |
synchronized (threadAvailableLock) { | |
submittedCount += 1; | |
} | |
try { | |
this.executor.execute(runnable); | |
return true; | |
} catch (RejectedExecutionException e) { | |
synchronized (threadAvailableLock) { | |
submittedCount -= 1; | |
} | |
return false; | |
} | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public void shutdown(boolean waitForJobsToComplete) { | |
if (waitForJobsToComplete) { | |
executor.shutdown(); | |
} else { | |
executor.shutdownNow(); | |
} | |
} | |
public void setInstanceId(String schedInstanceId) { | |
} | |
public void setInstanceName(String schedName) { | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment