Skip to content

Instantly share code, notes, and snippets.

@popcornylu
Last active June 8, 2016 04:05
Show Gist options
  • Save popcornylu/b5d7c7869b060f5b3bbdd338e9e4853f to your computer and use it in GitHub Desktop.
Save popcornylu/b5d7c7869b060f5b3bbdd338e9e4853f to your computer and use it in GitHub Desktop.
Throttle the number of concurrent tasks in a thread pool.
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
public class ThrottlingInvoker{
private Executor executor;
private int limit;
private int concurrent;
private LinkedList<Task> queue = new LinkedList<>();
public ThrottlingInvoker(Executor executor, int limit) {
this.executor = executor;
this.limit = limit;
}
synchronized
public <T> CompletableFuture<T> invoke(Supplier<T> supplier) {
CompletableFuture future = new CompletableFuture();
if (concurrent < limit) {
acquire(new Task(supplier, future));
} else {
queue.addFirst(new Task(supplier, future));
}
return future;
}
synchronized
private void acquire(Task task) {
concurrent++;
CompletableFuture wrappedFuture = CompletableFuture.supplyAsync(task.supplier, executor);
BiConsumer<Object, Exception> onComplete = (result, exception) -> {
if (exception != null) {
task.future.completeExceptionally(exception);
} else {
task.future.complete(result);
}
release();
};
wrappedFuture.whenComplete(onComplete);
}
synchronized
private void release() {
concurrent --;
Task task = queue.removeLast();
if (task != null) {
acquire(task);
}
}
public int getConcurrent() {
return concurrent;
}
class Task {
private final Supplier supplier;
private final CompletableFuture future;
Task(Supplier supplier, CompletableFuture future) {
this.supplier = supplier;
this.future = future;
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
ThrottlingInvoker invoker = new ThrottlingInvoker(executorService, 20);
Random rand = new Random();
for (int i=0; i<200; i++) {
final int fi = i;
invoker.invoke(() -> {
System.out.println("invoked:" + fi);
try {
Thread.sleep(rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task " + fi;
}).whenComplete((name, throwable)-> {
System.out.println("complete: " + name + " concurrent:" + invoker.getConcurrent());
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment