Forked from alexkasko/LimitedExecutorServiceWrapper.java
Last active
April 29, 2023 17:03
-
-
Save rwperrott/d7b957906e927c8ed6bea7e4175888c0 to your computer and use it in GitHub Desktop.
ExecutorService wrapper, limits max parallel threads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Objects; | |
import java.util.concurrent.AbstractExecutorService; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.FutureTask; | |
import java.util.concurrent.RejectedExecutionException; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.locks.Condition; | |
import java.util.concurrent.locks.ReentrantLock; | |
import lombok.NonNull; | |
import lombok.RequiredArgsConstructor; | |
import lombok.ToString; | |
import lombok.extern.slf4j.Slf4j; | |
/** | |
* based on https://gist.github.com/alexkasko/4045853 | |
* <p> | |
* Implementation Changes from original: | |
* <ul> | |
* <li>Added override of the two newTaskFor methods to create a custom | |
* RunnableFuture, OwnedRunnableFuture, so the many submit, invokeAll, and | |
* invokeAny methods no longer need to be overridden, and all the hassle that | |
* entails!</li> | |
* <li>Has a simple single linked list, with next in the OwnedRunnableFuture, to | |
* queue up work via the implemented execute method.</li> | |
* <li>A ReentrantLock is used to block other threads while doing atomic | |
* internal operations.</li> | |
* <li>The constructor submits a Runnable to do delayed polling of the work | |
* queue, with shutdown code in its catch clauses, in part to support indirect | |
* shutdown, via interruption by the parent Execution service; this Runnable | |
* does parallel limit blocking not the worker thread, then maps ownership, | |
* keyed by the original Runnable/Callable reference.</li> | |
* <li>Shutdown and ShutdownNow only affect this class and use the ownership | |
* mapping to control it's borrowed work Threads.</li> | |
* <li>Work Threads can be transferred between instances of this class, with the | |
* same parent ExecutionService, via the original Runnable/Callable reference, | |
* this transfers both ownership and parallel count use; this is to avoid the | |
* "make-work" of having to request a different Thread for the next limited | |
* phase of work.</li> | |
* <li>I eventually decided that the Semaphore was far better replaced by two | |
* int values and a Condition, all used inside the ReentrantLock clauses needed | |
* anyway, this also allows easier detection of terminated state, and make | |
* awaitTermination easy to implement. | |
* </li> | |
* </ul> | |
*/ | |
@ToString // Had to excludes several fields because useless or caused stack overflow recursion. | |
@Slf4j | |
public class LimitedExecutorServiceWrapper extends AbstractExecutorService { | |
private static final int RUNNING = 1; | |
private static final int SHUTDOWN = 2; | |
private static final int SHUTDOWN_NOW = 3; | |
private static final int TERMINATED = 4; | |
/** | |
* Object.toString can return far too much! | |
* | |
* @param runOrCall | |
* @return {SimpleName of class}:{System.identityHashCode(o)) | |
*/ | |
private static String id(Object runOrCall) { | |
return runOrCall.getClass().getSimpleName() + ":" + System.identityHashCode(runOrCall); | |
} | |
private static final AtomicInteger counter = new AtomicInteger(); | |
/** | |
* Added because ReentrantLock wasn't made AutoCloseable, thus pointless | |
* ugliness.... | |
* <p> | |
* Not done via Llambdas (Java 8+) wrapping because that is a worse hack. | |
*/ | |
private static class TryLock implements AutoCloseable { | |
private final ReentrantLock lock = new ReentrantLock(); | |
public TryLock lock() { | |
lock.lock(); | |
return this; | |
} | |
public Condition newCondition() { | |
return lock.newCondition(); | |
} | |
@Override | |
public void close() { | |
lock.unlock(); | |
} | |
} | |
// | |
// Object | |
// | |
private final ExecutorService parent; | |
public final int id = counter.incrementAndGet(); | |
// Atomic, to minimise use of mainLock | |
private final AtomicInteger state = new AtomicInteger(RUNNING); | |
// | |
// A Single linked list for holding the queued tasks, with next link in OwnedRunnableFuture | |
@ToString.Exclude // Prevent stack overflow | |
private OwnedRunnableFuture<?> headTask; // Take from here | |
@ToString.Exclude // Prevent stack overflow | |
private OwnedRunnableFuture<?> tailTask; // Add here | |
// These values, with Condition belowLimit, replace a Semaphore | |
private final int parallelLimit; | |
private int parallelCount; | |
/** | |
* K = original Runnable or Callable target, V = Ownable<?,?> | |
* This is only needed now to support ownership transfer, so could be remove | |
* if that functionality is not required. | |
*/ | |
@ToString.Exclude // Prevent stack overflow | |
private final Map<Object, Ownable<?, ?>> ownedMap = new HashMap<>(); | |
// Used to protect task queue and ensure constency with state. | |
@ToString.Exclude // not useful | |
private final TryLock mainLock = new TryLock(); | |
// Used by execute method to signal QueuePoller that the queue is no longer empty. | |
@ToString.Exclude // not useful | |
private final Condition notEmpty = mainLock.newCondition(); | |
// Used by disown method to signal own method that the parallelCount has been decremented below parallelLimit | |
@ToString.Exclude // not useful | |
private final Condition belowLimit = mainLock.newCondition(); | |
// Used by attemptTermination() method to signal awaitTermination() method of termination. | |
@ToString.Exclude // not useful | |
private final Condition onTerminate = mainLock.newCondition(); | |
// Used by shutdownNow to stop QueuePoller, even if waiting on Condition notEmpty | |
@ToString.Exclude // not useful | |
private final Future<?> queuePollerFuture; | |
// Holds the all the previous queued, but unexecuted, tasks after the first doShutdownNow() call. | |
@ToString.Exclude // Prevent stack overflow | |
private List<Runnable> neverExecuted; | |
/** | |
* | |
* @param parent executor service to wrap | |
* @param parallelLimit max parallel threads available in provided executor | |
* @param pollTimeout timeout value for internal work queue | |
* @param pollTimeUnit timeout unit for internal work queue | |
*/ | |
public LimitedExecutorServiceWrapper( | |
@NonNull ExecutorService parent, | |
int parallelLimit, | |
int pollTimeout, | |
@NonNull TimeUnit pollTimeUnit) { | |
if (parallelLimit <= 0) | |
throw new IllegalArgumentException("limit mast be positive but was: " + parallelLimit); | |
if (parent instanceof LimitedExecutorServiceWrapper) | |
throw new IllegalArgumentException("parent can't be a LimitedExecutorServiceWrapper"); | |
this.parent = parent; | |
this.parallelLimit = parallelLimit; | |
// | |
// Start QueuePoller using parent executer, so that parent can shutdown this ExecutorService | |
// indirectly, via interrupting it. | |
queuePollerFuture = parent.submit(new QueuePoller(pollTimeout, pollTimeUnit)); | |
} | |
private void enqueueTask(OwnedRunnableFuture<?> newTask) { | |
if (headTask == null) { | |
headTask = tailTask = newTask; | |
notEmpty.signal(); | |
} else { | |
tailTask.next = newTask; | |
tailTask = newTask; | |
} | |
} | |
private OwnedRunnableFuture<?> dequeueTask() { | |
OwnedRunnableFuture<?> task = headTask; | |
if (null != task) { | |
headTask = task.next; | |
if (null == headTask) | |
tailTask = null; | |
} | |
return task; | |
} | |
/** | |
* Called by either shutdownNow method or a QueuePoller catch clause, inside | |
* a locked clause. | |
*/ | |
private List<Runnable> doShutdownNow() { | |
try (final TryLock lock = mainLock.lock()) { | |
if (null == neverExecuted) { | |
state.set(SHUTDOWN_NOW); | |
// Move whole task queue to neverExecuted list. | |
neverExecuted = new ArrayList<>(); | |
for (OwnedRunnableFuture<?> task = headTask; null != task; task = task.next) | |
neverExecuted.add(task); | |
headTask = tailTask = null; | |
log.trace("doShutdownNow with {} tasks never executed", neverExecuted.size()); | |
// | |
attemptTermination(); | |
} | |
return neverExecuted; | |
} | |
} | |
private void attemptTermination() { | |
if (0 == parallelCount) { | |
state.set(TERMINATED); | |
onTerminate.signalAll(); | |
} | |
} | |
private void own(Object runOrCall, Ownable<?, ?> owned) throws InterruptedException { | |
if (log.isTraceEnabled()) | |
log.trace("{}.own({},{})", id, id(runOrCall), owned); | |
// | |
try (final TryLock lock = mainLock.lock()) { | |
if (parallelCount == parallelLimit) | |
belowLimit.await(); | |
parallelCount++; | |
owned.owner = this; | |
ownedMap.put(runOrCall, owned); | |
} | |
} | |
private Ownable<?, ?> disown(Object runOrCall) { | |
if (log.isTraceEnabled()) | |
log.trace("{}.disown({})", id, id(runOrCall)); | |
try (final TryLock lock = mainLock.lock()) { | |
if (isTerminated()) | |
throw new IllegalStateException("Bug, shouldn't be called after Termination"); | |
// | |
Ownable<?, ?> owned = ownedMap.remove(runOrCall); | |
if (null == owned) | |
if (runOrCall instanceof Ownable) | |
throw new IllegalStateException("bug, runOrCall cannot be an Ownable"); | |
else if (runOrCall instanceof Runnable || runOrCall instanceof Callable) | |
throw new IllegalStateException(id(runOrCall) + " not owned by " + id); | |
else | |
throw new IllegalArgumentException(id(runOrCall) + " not a Runnable or Callable"); | |
// | |
if (parallelCount-- == parallelLimit) | |
belowLimit.signalAll(); | |
// | |
if (state.get() >= SHUTDOWN) | |
attemptTermination(); | |
// | |
return owned; | |
} | |
} | |
/** | |
* Transfer a running task from old owner to this. | |
* | |
* @param runOrCall a Runnable or Callable | |
* @param oldOwner must have same the same parent | |
* @throws InterruptedException | |
*/ | |
public void transferFrom(@NonNull Object runOrCall, @NonNull LimitedExecutorServiceWrapper oldOwner) throws InterruptedException { | |
if (state.get() != RUNNING) | |
throw new IllegalStateException("Not Running"); | |
if (!Objects.equals(parent, oldOwner.parent)) | |
throw new IllegalArgumentException("different parent"); | |
own(runOrCall, oldOwner.disown(runOrCall)); | |
} | |
/** | |
* Transfer a running task from this to new owner. | |
* | |
* @param runOrCall a Runnable or Callable | |
* @param newOwner must have same the same parent | |
* @throws InterruptedException | |
*/ | |
public void transferTo(@NonNull Object runOrCall, @NonNull LimitedExecutorServiceWrapper newOwner) throws InterruptedException { | |
newOwner.transferFrom(runOrCall, this); | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public void shutdown() { | |
state.compareAndSet(RUNNING, SHUTDOWN); | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public List<Runnable> shutdownNow() { | |
List<Runnable> r = doShutdownNow(); | |
queuePollerFuture.cancel(true); | |
return r; | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public boolean isShutdown() { | |
return state.get() >= SHUTDOWN; | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public boolean isTerminated() { | |
return state.get() == TERMINATED; | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { | |
try (final TryLock lock = mainLock.lock()) { | |
return isTerminated() || onTerminate.await(timeout, unit); | |
} | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
protected <T> OwnedRunnableFuture<T> newTaskFor(Runnable runnable, T value) { | |
if (state.get() != RUNNING) | |
throw new RejectedExecutionException(); | |
// | |
return new SemaphoreRunnable<>(this, runnable, value).runnableFuture(); | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
protected <T> OwnedRunnableFuture<T> newTaskFor(Callable<T> callable) { | |
if (state.get() != RUNNING) | |
throw new RejectedExecutionException(); | |
// | |
return new SemaphoreCallable<>(this, callable).runnableFuture(); | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public void execute(@NonNull Runnable command) { | |
try (final TryLock lock = mainLock.lock()) { | |
if (command instanceof OwnedRunnableFuture) | |
enqueueTask((OwnedRunnableFuture<?>) command); | |
else | |
enqueueTask(newTaskFor(command, null)); | |
} | |
} | |
/** | |
* This is required for target Executor<p> | |
* This has two purposes to prevent double-wrap and to allow access to | |
* wrapped Ownable for ownership management. | |
* | |
* @param <V> | |
*/ | |
protected static final class OwnedRunnableFuture<V> extends FutureTask<V> { | |
/** | |
* Single Linked List next here, so don't need a wrapping node object. | |
*/ | |
private transient OwnedRunnableFuture<?> next; | |
/** | |
* To allow QueuePoller to get access to the wrapped ownable to own it. | |
*/ | |
private final Ownable<V, ?> ownable; | |
protected OwnedRunnableFuture(SemaphoreRunnable<V> ownable) { | |
super(ownable); | |
this.ownable = ownable; | |
} | |
protected OwnedRunnableFuture(SemaphoreCallable<V> ownable) { | |
super(ownable); | |
this.ownable = ownable; | |
} | |
@Override | |
public String toString() { | |
return ownable.toString(); | |
} | |
} | |
/** | |
* Created to store common properties, mostly as final, to avoid the need | |
* for access methods. | |
* | |
* @param <V> Datatype of Future, if T is type of Callable or provided | |
* result type for a Runnable | |
* @param <T> Runnable or Callable | |
*/ | |
@ToString | |
protected static abstract class Ownable<V, T> { | |
/** | |
* Stored and volatile to allow ownership transfer between instance with | |
* the same parent ExecutorService | |
*/ | |
protected volatile LimitedExecutorServiceWrapper owner; | |
/** | |
* Used as an ownership key and an execution target. | |
*/ | |
protected final T target; | |
@SuppressWarnings("OverridableMethodCallInConstructor") | |
protected Ownable(LimitedExecutorServiceWrapper owner, T target) { | |
this.owner = owner; | |
this.target = target; | |
} | |
abstract OwnedRunnableFuture<V> runnableFuture(); | |
} | |
/** | |
* Callable target executable wrapper | |
* | |
* @param <V> | |
*/ | |
protected static final class SemaphoreCallable<V> extends Ownable<V, Callable<V>> | |
implements Callable<V> { | |
@SuppressWarnings("LeakingThisInConstructor") | |
private SemaphoreCallable(LimitedExecutorServiceWrapper owner, Callable<V> target) { | |
super(owner, target); | |
} | |
@Override | |
protected OwnedRunnableFuture<V> runnableFuture() { | |
return new OwnedRunnableFuture<>(this); | |
} | |
@Override | |
public V call() throws Exception { | |
try { | |
return target.call(); | |
} finally { | |
owner.disown(target); | |
} | |
} | |
} | |
/** | |
* Runnable target executable wrapper | |
* | |
* @param <V> | |
*/ | |
protected static final class SemaphoreRunnable<V> extends Ownable<V, Runnable> | |
implements Callable<V> { | |
private final V result; | |
@SuppressWarnings("LeakingThisInConstructor") | |
private SemaphoreRunnable(LimitedExecutorServiceWrapper owner, Runnable target, V result) { | |
super(owner, target); | |
this.result = result; | |
} | |
@Override | |
protected OwnedRunnableFuture<V> runnableFuture() { | |
return new OwnedRunnableFuture<>(this); | |
} | |
@Override | |
public V call() throws Exception { | |
try { | |
target.run(); | |
return result; | |
} finally { | |
owner.disown(target); | |
} | |
} | |
} | |
@RequiredArgsConstructor | |
private class QueuePoller implements Runnable { | |
private final long pollTimeout; | |
private final TimeUnit pollTimeUnit; | |
@Override | |
@SuppressWarnings("UseSpecificCatch") | |
public void run() { | |
final String name = getClass().getSimpleName() + "#" + id; | |
log.trace("{} Started", name); | |
final Thread thread = Thread.currentThread(); | |
final String oldThreadName = thread.getName(); | |
try { | |
thread.setName(name); | |
OwnedRunnableFuture<?> task; | |
while (true) { | |
try (final TryLock lock = mainLock.lock()) { | |
task = dequeueTask(); | |
if (null == task) { | |
if (state.get() != RUNNING) | |
return; // Probably Shutdown state | |
notEmpty.await(pollTimeout, pollTimeUnit); | |
continue; | |
} | |
} | |
final Ownable<?, ?> ownable = task.ownable; | |
own(ownable.target, ownable); | |
log.trace("{} parent.execute", name); | |
parent.execute(task); | |
} | |
} catch (InterruptedException t) { | |
log.warn("{} Interrupted, so shutting down", name); | |
doShutdownNow(); | |
} catch (Throwable t) { | |
log.error("{} Unexpected failure, so shutting down", name, t); | |
doShutdownNow(); | |
throw t; | |
} finally { | |
log.trace("{} Terminating", name); | |
thread.setName(oldThreadName); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment