Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save rwperrott/d7b957906e927c8ed6bea7e4175888c0 to your computer and use it in GitHub Desktop.
Save rwperrott/d7b957906e927c8ed6bea7e4175888c0 to your computer and use it in GitHub Desktop.
ExecutorService wrapper, limits max parallel threads
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
/**
* based on https://gist.github.com/alexkasko/4045853
* <p>
* Implementation Changes from original:
* <ul>
* <li>Added override of the two newTaskFor methods to create a custom
* RunnableFuture, OwnedRunnableFuture, so the many submit, invokeAll, and
* invokeAny methods no longer need to be overridden, and all the hassle that
* entails!</li>
* <li>Has a simple single linked list, with next in the OwnedRunnableFuture, to
* queue up work via the implemented execute method.</li>
* <li>A ReentrantLock is used to block other threads while doing atomic
* internal operations.</li>
* <li>The constructor submits a Runnable to do delayed polling of the work
* queue, with shutdown code in its catch clauses, in part to support indirect
* shutdown, via interruption by the parent Execution service; this Runnable
* does parallel limit blocking not the worker thread, then maps ownership,
* keyed by the original Runnable/Callable reference.</li>
* <li>Shutdown and ShutdownNow only affect this class and use the ownership
* mapping to control it's borrowed work Threads.</li>
* <li>Work Threads can be transferred between instances of this class, with the
* same parent ExecutionService, via the original Runnable/Callable reference,
* this transfers both ownership and parallel count use; this is to avoid the
* "make-work" of having to request a different Thread for the next limited
* phase of work.</li>
* <li>I eventually decided that the Semaphore was far better replaced by two
* int values and a Condition, all used inside the ReentrantLock clauses needed
* anyway, this also allows easier detection of terminated state, and make
* awaitTermination easy to implement.
* </li>
* </ul>
*/
@ToString // Had to excludes several fields because useless or caused stack overflow recursion.
@Slf4j
public class LimitedExecutorServiceWrapper extends AbstractExecutorService {
private static final int RUNNING = 1;
private static final int SHUTDOWN = 2;
private static final int SHUTDOWN_NOW = 3;
private static final int TERMINATED = 4;
/**
* Object.toString can return far too much!
*
* @param runOrCall
* @return {SimpleName of class}:{System.identityHashCode(o))
*/
private static String id(Object runOrCall) {
return runOrCall.getClass().getSimpleName() + ":" + System.identityHashCode(runOrCall);
}
private static final AtomicInteger counter = new AtomicInteger();
/**
* Added because ReentrantLock wasn't made AutoCloseable, thus pointless
* ugliness....
* <p>
* Not done via Llambdas (Java 8+) wrapping because that is a worse hack.
*/
private static class TryLock implements AutoCloseable {
private final ReentrantLock lock = new ReentrantLock();
public TryLock lock() {
lock.lock();
return this;
}
public Condition newCondition() {
return lock.newCondition();
}
@Override
public void close() {
lock.unlock();
}
}
//
// Object
//
private final ExecutorService parent;
public final int id = counter.incrementAndGet();
// Atomic, to minimise use of mainLock
private final AtomicInteger state = new AtomicInteger(RUNNING);
//
// A Single linked list for holding the queued tasks, with next link in OwnedRunnableFuture
@ToString.Exclude // Prevent stack overflow
private OwnedRunnableFuture<?> headTask; // Take from here
@ToString.Exclude // Prevent stack overflow
private OwnedRunnableFuture<?> tailTask; // Add here
// These values, with Condition belowLimit, replace a Semaphore
private final int parallelLimit;
private int parallelCount;
/**
* K = original Runnable or Callable target, V = Ownable<?,?>
* This is only needed now to support ownership transfer, so could be remove
* if that functionality is not required.
*/
@ToString.Exclude // Prevent stack overflow
private final Map<Object, Ownable<?, ?>> ownedMap = new HashMap<>();
// Used to protect task queue and ensure constency with state.
@ToString.Exclude // not useful
private final TryLock mainLock = new TryLock();
// Used by execute method to signal QueuePoller that the queue is no longer empty.
@ToString.Exclude // not useful
private final Condition notEmpty = mainLock.newCondition();
// Used by disown method to signal own method that the parallelCount has been decremented below parallelLimit
@ToString.Exclude // not useful
private final Condition belowLimit = mainLock.newCondition();
// Used by attemptTermination() method to signal awaitTermination() method of termination.
@ToString.Exclude // not useful
private final Condition onTerminate = mainLock.newCondition();
// Used by shutdownNow to stop QueuePoller, even if waiting on Condition notEmpty
@ToString.Exclude // not useful
private final Future<?> queuePollerFuture;
// Holds the all the previous queued, but unexecuted, tasks after the first doShutdownNow() call.
@ToString.Exclude // Prevent stack overflow
private List<Runnable> neverExecuted;
/**
*
* @param parent executor service to wrap
* @param parallelLimit max parallel threads available in provided executor
* @param pollTimeout timeout value for internal work queue
* @param pollTimeUnit timeout unit for internal work queue
*/
public LimitedExecutorServiceWrapper(
@NonNull ExecutorService parent,
int parallelLimit,
int pollTimeout,
@NonNull TimeUnit pollTimeUnit) {
if (parallelLimit <= 0)
throw new IllegalArgumentException("limit mast be positive but was: " + parallelLimit);
if (parent instanceof LimitedExecutorServiceWrapper)
throw new IllegalArgumentException("parent can't be a LimitedExecutorServiceWrapper");
this.parent = parent;
this.parallelLimit = parallelLimit;
//
// Start QueuePoller using parent executer, so that parent can shutdown this ExecutorService
// indirectly, via interrupting it.
queuePollerFuture = parent.submit(new QueuePoller(pollTimeout, pollTimeUnit));
}
private void enqueueTask(OwnedRunnableFuture<?> newTask) {
if (headTask == null) {
headTask = tailTask = newTask;
notEmpty.signal();
} else {
tailTask.next = newTask;
tailTask = newTask;
}
}
private OwnedRunnableFuture<?> dequeueTask() {
OwnedRunnableFuture<?> task = headTask;
if (null != task) {
headTask = task.next;
if (null == headTask)
tailTask = null;
}
return task;
}
/**
* Called by either shutdownNow method or a QueuePoller catch clause, inside
* a locked clause.
*/
private List<Runnable> doShutdownNow() {
try (final TryLock lock = mainLock.lock()) {
if (null == neverExecuted) {
state.set(SHUTDOWN_NOW);
// Move whole task queue to neverExecuted list.
neverExecuted = new ArrayList<>();
for (OwnedRunnableFuture<?> task = headTask; null != task; task = task.next)
neverExecuted.add(task);
headTask = tailTask = null;
log.trace("doShutdownNow with {} tasks never executed", neverExecuted.size());
//
attemptTermination();
}
return neverExecuted;
}
}
private void attemptTermination() {
if (0 == parallelCount) {
state.set(TERMINATED);
onTerminate.signalAll();
}
}
private void own(Object runOrCall, Ownable<?, ?> owned) throws InterruptedException {
if (log.isTraceEnabled())
log.trace("{}.own({},{})", id, id(runOrCall), owned);
//
try (final TryLock lock = mainLock.lock()) {
if (parallelCount == parallelLimit)
belowLimit.await();
parallelCount++;
owned.owner = this;
ownedMap.put(runOrCall, owned);
}
}
private Ownable<?, ?> disown(Object runOrCall) {
if (log.isTraceEnabled())
log.trace("{}.disown({})", id, id(runOrCall));
try (final TryLock lock = mainLock.lock()) {
if (isTerminated())
throw new IllegalStateException("Bug, shouldn't be called after Termination");
//
Ownable<?, ?> owned = ownedMap.remove(runOrCall);
if (null == owned)
if (runOrCall instanceof Ownable)
throw new IllegalStateException("bug, runOrCall cannot be an Ownable");
else if (runOrCall instanceof Runnable || runOrCall instanceof Callable)
throw new IllegalStateException(id(runOrCall) + " not owned by " + id);
else
throw new IllegalArgumentException(id(runOrCall) + " not a Runnable or Callable");
//
if (parallelCount-- == parallelLimit)
belowLimit.signalAll();
//
if (state.get() >= SHUTDOWN)
attemptTermination();
//
return owned;
}
}
/**
* Transfer a running task from old owner to this.
*
* @param runOrCall a Runnable or Callable
* @param oldOwner must have same the same parent
* @throws InterruptedException
*/
public void transferFrom(@NonNull Object runOrCall, @NonNull LimitedExecutorServiceWrapper oldOwner) throws InterruptedException {
if (state.get() != RUNNING)
throw new IllegalStateException("Not Running");
if (!Objects.equals(parent, oldOwner.parent))
throw new IllegalArgumentException("different parent");
own(runOrCall, oldOwner.disown(runOrCall));
}
/**
* Transfer a running task from this to new owner.
*
* @param runOrCall a Runnable or Callable
* @param newOwner must have same the same parent
* @throws InterruptedException
*/
public void transferTo(@NonNull Object runOrCall, @NonNull LimitedExecutorServiceWrapper newOwner) throws InterruptedException {
newOwner.transferFrom(runOrCall, this);
}
/**
* {@inheritDoc}
*/
@Override
public void shutdown() {
state.compareAndSet(RUNNING, SHUTDOWN);
}
/**
* {@inheritDoc}
*/
@Override
public List<Runnable> shutdownNow() {
List<Runnable> r = doShutdownNow();
queuePollerFuture.cancel(true);
return r;
}
/**
* {@inheritDoc}
*/
@Override
public boolean isShutdown() {
return state.get() >= SHUTDOWN;
}
/**
* {@inheritDoc}
*/
@Override
public boolean isTerminated() {
return state.get() == TERMINATED;
}
/**
* {@inheritDoc}
*/
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
try (final TryLock lock = mainLock.lock()) {
return isTerminated() || onTerminate.await(timeout, unit);
}
}
/**
* {@inheritDoc}
*/
@Override
protected <T> OwnedRunnableFuture<T> newTaskFor(Runnable runnable, T value) {
if (state.get() != RUNNING)
throw new RejectedExecutionException();
//
return new SemaphoreRunnable<>(this, runnable, value).runnableFuture();
}
/**
* {@inheritDoc}
*/
@Override
protected <T> OwnedRunnableFuture<T> newTaskFor(Callable<T> callable) {
if (state.get() != RUNNING)
throw new RejectedExecutionException();
//
return new SemaphoreCallable<>(this, callable).runnableFuture();
}
/**
* {@inheritDoc}
*/
@Override
public void execute(@NonNull Runnable command) {
try (final TryLock lock = mainLock.lock()) {
if (command instanceof OwnedRunnableFuture)
enqueueTask((OwnedRunnableFuture<?>) command);
else
enqueueTask(newTaskFor(command, null));
}
}
/**
* This is required for target Executor<p>
* This has two purposes to prevent double-wrap and to allow access to
* wrapped Ownable for ownership management.
*
* @param <V>
*/
protected static final class OwnedRunnableFuture<V> extends FutureTask<V> {
/**
* Single Linked List next here, so don't need a wrapping node object.
*/
private transient OwnedRunnableFuture<?> next;
/**
* To allow QueuePoller to get access to the wrapped ownable to own it.
*/
private final Ownable<V, ?> ownable;
protected OwnedRunnableFuture(SemaphoreRunnable<V> ownable) {
super(ownable);
this.ownable = ownable;
}
protected OwnedRunnableFuture(SemaphoreCallable<V> ownable) {
super(ownable);
this.ownable = ownable;
}
@Override
public String toString() {
return ownable.toString();
}
}
/**
* Created to store common properties, mostly as final, to avoid the need
* for access methods.
*
* @param <V> Datatype of Future, if T is type of Callable or provided
* result type for a Runnable
* @param <T> Runnable or Callable
*/
@ToString
protected static abstract class Ownable<V, T> {
/**
* Stored and volatile to allow ownership transfer between instance with
* the same parent ExecutorService
*/
protected volatile LimitedExecutorServiceWrapper owner;
/**
* Used as an ownership key and an execution target.
*/
protected final T target;
@SuppressWarnings("OverridableMethodCallInConstructor")
protected Ownable(LimitedExecutorServiceWrapper owner, T target) {
this.owner = owner;
this.target = target;
}
abstract OwnedRunnableFuture<V> runnableFuture();
}
/**
* Callable target executable wrapper
*
* @param <V>
*/
protected static final class SemaphoreCallable<V> extends Ownable<V, Callable<V>>
implements Callable<V> {
@SuppressWarnings("LeakingThisInConstructor")
private SemaphoreCallable(LimitedExecutorServiceWrapper owner, Callable<V> target) {
super(owner, target);
}
@Override
protected OwnedRunnableFuture<V> runnableFuture() {
return new OwnedRunnableFuture<>(this);
}
@Override
public V call() throws Exception {
try {
return target.call();
} finally {
owner.disown(target);
}
}
}
/**
* Runnable target executable wrapper
*
* @param <V>
*/
protected static final class SemaphoreRunnable<V> extends Ownable<V, Runnable>
implements Callable<V> {
private final V result;
@SuppressWarnings("LeakingThisInConstructor")
private SemaphoreRunnable(LimitedExecutorServiceWrapper owner, Runnable target, V result) {
super(owner, target);
this.result = result;
}
@Override
protected OwnedRunnableFuture<V> runnableFuture() {
return new OwnedRunnableFuture<>(this);
}
@Override
public V call() throws Exception {
try {
target.run();
return result;
} finally {
owner.disown(target);
}
}
}
@RequiredArgsConstructor
private class QueuePoller implements Runnable {
private final long pollTimeout;
private final TimeUnit pollTimeUnit;
@Override
@SuppressWarnings("UseSpecificCatch")
public void run() {
final String name = getClass().getSimpleName() + "#" + id;
log.trace("{} Started", name);
final Thread thread = Thread.currentThread();
final String oldThreadName = thread.getName();
try {
thread.setName(name);
OwnedRunnableFuture<?> task;
while (true) {
try (final TryLock lock = mainLock.lock()) {
task = dequeueTask();
if (null == task) {
if (state.get() != RUNNING)
return; // Probably Shutdown state
notEmpty.await(pollTimeout, pollTimeUnit);
continue;
}
}
final Ownable<?, ?> ownable = task.ownable;
own(ownable.target, ownable);
log.trace("{} parent.execute", name);
parent.execute(task);
}
} catch (InterruptedException t) {
log.warn("{} Interrupted, so shutting down", name);
doShutdownNow();
} catch (Throwable t) {
log.error("{} Unexpected failure, so shutting down", name, t);
doShutdownNow();
throw t;
} finally {
log.trace("{} Terminating", name);
thread.setName(oldThreadName);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment