Skip to content

Instantly share code, notes, and snippets.

@alexkasko
Created November 9, 2012 14:08
Show Gist options
  • Save alexkasko/4045853 to your computer and use it in GitHub Desktop.
Save alexkasko/4045853 to your computer and use it in GitHub Desktop.
ExecutorService wrapper, limits max parallel threads
/**
* {@link ExecutorService} wrapper, limit max parallel threads to provided limit.
* May be useful for task with different parrallelism over the same executor.
*
* @author alexkasko
* Date: 7/6/12
*/
public class LimitedExecutorServiceWrapper implements ExecutorService {
private final ExecutorService target;
private final Semaphore semaphore;
/**
* @param executor executor service to wrap
* @param parallelLimit max parallel threads available in provided executor service through this instance
*/
public LimitedExecutorServiceWrapper(ExecutorService executor, int parallelLimit) {
checkNotNull(executor, "Provided executor is null");
checkArgument(parallelLimit > 0, "Limit mast be positive but was: '%s'", parallelLimit);
this.target = executor;
this.semaphore = new Semaphore(parallelLimit);
}
/**
* {@inheritDoc}
*/
@Override
public void shutdown() {
target.shutdown();
}
/**
* {@inheritDoc}
*/
@Override
public List<Runnable> shutdownNow() {
return target.shutdownNow();
}
/**
* {@inheritDoc}
*/
@Override
public boolean isShutdown() {
return target.isShutdown();
}
/**
* {@inheritDoc}
*/
@Override
public boolean isTerminated() {
return target.isTerminated();
}
/**
* {@inheritDoc}
*/
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return target.awaitTermination(timeout, unit);
}
/**
* {@inheritDoc}
*/
@Override
public <T> Future<T> submit(Callable<T> task) {
return target.submit(new SemaphoreCallable<T>(task));
}
/**
* {@inheritDoc}
*/
@Override
public <T> Future<T> submit(Runnable task, T result) {
return target.submit(new SemaphoreRunnable(task), result);
}
/**
* {@inheritDoc}
*/
@Override
public Future<?> submit(Runnable task) {
return target.submit(new SemaphoreRunnable(task));
}
/**
* {@inheritDoc}
*/
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return target.invokeAll(Collections2.transform(tasks, new SemaphoreCallableFun<T>()));
}
/**
* {@inheritDoc}
*/
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return target.invokeAll(Collections2.transform(tasks, new SemaphoreCallableFun<T>()), timeout, unit);
}
/**
* {@inheritDoc}
*/
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return target.invokeAny(Collections2.transform(tasks, new SemaphoreCallableFun<T>()));
}
/**
* {@inheritDoc}
*/
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return target.invokeAny(Collections2.transform(tasks, new SemaphoreCallableFun<T>()), timeout, unit);
}
/**
* {@inheritDoc}
*/
@Override
public void execute(Runnable command) {
target.execute(new SemaphoreRunnable(command));
}
private class SemaphoreCallable<T> implements Callable<T> {
private final Callable<T> target;
private SemaphoreCallable(Callable<T> target) {
checkNotNull(target, "Provided callable is null");
this.target = target;
}
@Override
public T call() throws Exception {
try {
semaphore.acquire();
return target.call();
} finally {
semaphore.release();
}
}
}
private class SemaphoreRunnable implements Runnable {
private final Runnable target;
private SemaphoreRunnable(Runnable target) {
this.target = target;
}
@Override
public void run() {
try {
semaphore.acquire();
target.run();
} catch(InterruptedException e) {
throw new UnhandledException(e);
} finally {
semaphore.release();
}
}
}
private class SemaphoreCallableFun<T> implements Function<Callable<T>, SemaphoreCallable<T>> {
@SuppressWarnings("unchecked")
@Override
public SemaphoreCallable<T> apply(@Nullable Callable<T> input) {
return new SemaphoreCallable<T>(input);
}
}
}
@rwperrott
Copy link

rwperrott commented Oct 21, 2019

Beware, this is seriously flawed (as I found in practice), thus not fit for purpose:

  • semaphore.acquire() is called after a Thread is allocated by the target ExecutorService, thus no local limit on the number of threads blocking on semaphore.acquire(), so doesn't prevent excessive Thread use, and may even cause Thread exhaustion for the parent ExecutorService, which may cause surprising concurrency deadlocks, or even JVM memory exhaustion.
  • SemaphoreRunnable should be a Callable, with stored value/null, to prevent redundant Callable wrapping in the target ExecutorService.
  • The shutdown(), shutdownNow(), isShutdown(), isShutdown() and awaitTermination() method implementations are misleading, thus pointless/dangerous, without local-only shutdown code. Reference to ThreadPoolExecutor source code highlights lots missing for proper local shutdown.

How it should have been done was to extend AbstractExecutorService, have all the submit methods call super, not target, with an execute method adding to a 'LinkedBlockingQueue' work queue, with a separate Thread, with a loop timeout-polling the work queue for a Runnable, calling semaphore.acquire(), then calling target.execute(Runnable). There is even more work to do shutdown properly...

@alexkasko
Copy link
Author

alexkasko commented Oct 21, 2019 via email

@rwperrott
Copy link

I forked this on GitHub, and made an enhanced version which fixes all these issues; one of my private projects uses all the implemented features, in multiple instances, so it does work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment