Skip to content

Instantly share code, notes, and snippets.

@Bill
Last active March 4, 2021 18:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Bill/3e0587f43171d2d72718a945deb79fb3 to your computer and use it in GitHub Desktop.
Save Bill/3e0587f43171d2d72718a945deb79fb3 to your computer and use it in GitHub Desktop.
An experiment with a ThreadPoolExecutor and various sync/async workQueues for https://cwiki.apache.org/confluence/display/GEODE/Thread+Pools
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
class Scratch {
public static void main(String[] args) throws InterruptedException, ExecutionException {
for( final boolean allowCoreThreadTimeout : Arrays.asList(false, true)) {
for( final int queueLimit : Arrays.asList(0, 5)) {
for( final int corePoolSize : Arrays.asList(0, 5)) {
runExperiment(corePoolSize, 5, queueLimit, allowCoreThreadTimeout, 1);
}
}
}
}
private static void runExperiment(final int corePoolSize, final int maxPoolSize,
final int queueLimit, final boolean allowCoreThreadTimeout,
final long keepAliveSeconds)
throws InterruptedException, ExecutionException {
final BlockingQueue<Runnable>
workQueue =
queueLimit == 0 ? new SynchronousQueue<>() : new LinkedBlockingQueue<>(queueLimit);
final ThreadFactory threadFactory = Executors.defaultThreadFactory();
final ThreadPoolExecutor
tpe =
new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.SECONDS,
workQueue, threadFactory);
tpe.allowCoreThreadTimeOut(allowCoreThreadTimeout);
System.out.println("---------- Experiment Starting -------------");
System.out.println(" corePoolSize: " + corePoolSize);
System.out.println(" maxPoolSize: " + maxPoolSize);
System.out.println(" BlockingQueue limit: " + queueLimit);
System.out.println("allowCoreThreadTimeout: " + allowCoreThreadTimeout);
final List<Future<?>> futures = new ArrayList<>();
for (int i = 1; i <= 3; i++) {
futures.add(tpe.submit(new SampleWorker("task " + i)));
}
System.out.println(" concurrency: " + tpe.getActiveCount());
for (Future<?> future : futures) {
future.get();
}
tpe.awaitTermination(2, TimeUnit.SECONDS);
System.out.println(" Leftover threads: " + tpe.getPoolSize());
tpe.shutdown();
System.out.println("---------- Experiment Complete -------------");
}
static class SampleWorker implements Runnable {
private String workerName;
SampleWorker(String tName) {
workerName = tName;
}
@Override
public void run() {
try {
for (int i = 1; i <= 3; i++) {
Thread.sleep(500);
System.out.println(this.workerName + " step " + i);
}
System.out.println(this.workerName + " finished");
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
@Bill
Copy link
Author

Bill commented Mar 4, 2021

Sample output follows. Notice how, when core pool size is zero and we use a non-synchronous queue (BlockingQueue limit > 0) we see the tasks executed sequentially.

But this is easily overcome by setting the corePoolSize > 0 (we use 5 in these tests.) The ThreadPoolExecutor immediately assigns tasks to threads until the core pool size is hit. This immediate dispatch takes place regardless of the type (syncronous vs non-syncronous) of BlockingQueue specified.

Also, the tests show it's also possible to drain the core pool when demand is dropped, by setting allowCoreThreadTimeout = true

---------- Experiment Starting -------------
          corePoolSize: 0
           maxPoolSize: 5
   BlockingQueue limit: 0
allowCoreThreadTimeout: false
        concurrency: 3
task 1 step 1
task 2 step 1
task 3 step 1
task 3 step 2
task 1 step 2
task 2 step 2
task 2 step 3
task 3 step 3
task 3 finished
task 1 step 3
task 1 finished
task 2 finished
      Leftover threads: 0
---------- Experiment Complete -------------
---------- Experiment Starting -------------
          corePoolSize: 5
           maxPoolSize: 5
   BlockingQueue limit: 0
allowCoreThreadTimeout: false
        concurrency: 3
task 3 step 1
task 1 step 1
task 2 step 1
task 1 step 2
task 2 step 2
task 3 step 2
task 1 step 3
task 3 step 3
task 3 finished
task 2 step 3
task 2 finished
task 1 finished
      Leftover threads: 3
---------- Experiment Complete -------------
---------- Experiment Starting -------------
          corePoolSize: 0
           maxPoolSize: 5
   BlockingQueue limit: 5
allowCoreThreadTimeout: false
        concurrency: 1
task 1 step 1
task 1 step 2
task 1 step 3
task 1 finished
task 2 step 1
task 2 step 2
task 2 step 3
task 2 finished
task 3 step 1
task 3 step 2
task 3 step 3
task 3 finished
      Leftover threads: 0
---------- Experiment Complete -------------
---------- Experiment Starting -------------
          corePoolSize: 5
           maxPoolSize: 5
   BlockingQueue limit: 5
allowCoreThreadTimeout: false
        concurrency: 3
task 1 step 1
task 2 step 1
task 3 step 1
task 2 step 2
task 1 step 2
task 3 step 2
task 1 step 3
task 1 finished
task 2 step 3
task 2 finished
task 3 step 3
task 3 finished
      Leftover threads: 3
---------- Experiment Complete -------------
---------- Experiment Starting -------------
          corePoolSize: 0
           maxPoolSize: 5
   BlockingQueue limit: 0
allowCoreThreadTimeout: true
        concurrency: 3
task 1 step 1
task 2 step 1
task 3 step 1
task 1 step 2
task 3 step 2
task 2 step 2
task 2 step 3
task 2 finished
task 1 step 3
task 1 finished
task 3 step 3
task 3 finished
      Leftover threads: 0
---------- Experiment Complete -------------
---------- Experiment Starting -------------
          corePoolSize: 5
           maxPoolSize: 5
   BlockingQueue limit: 0
allowCoreThreadTimeout: true
        concurrency: 3
task 3 step 1
task 1 step 1
task 2 step 1
task 3 step 2
task 1 step 2
task 2 step 2
task 2 step 3
task 2 finished
task 1 step 3
task 1 finished
task 3 step 3
task 3 finished
      Leftover threads: 0
---------- Experiment Complete -------------
---------- Experiment Starting -------------
          corePoolSize: 0
           maxPoolSize: 5
   BlockingQueue limit: 5
allowCoreThreadTimeout: true
        concurrency: 1
task 1 step 1
task 1 step 2
task 1 step 3
task 1 finished
task 2 step 1
task 2 step 2
task 2 step 3
task 2 finished
task 3 step 1
task 3 step 2
task 3 step 3
task 3 finished
      Leftover threads: 0
---------- Experiment Complete -------------
---------- Experiment Starting -------------
          corePoolSize: 5
           maxPoolSize: 5
   BlockingQueue limit: 5
allowCoreThreadTimeout: true
        concurrency: 3
task 2 step 1
task 3 step 1
task 1 step 1
task 2 step 2
task 1 step 2
task 3 step 2
task 2 step 3
task 2 finished
task 1 step 3
task 1 finished
task 3 step 3
task 3 finished
      Leftover threads: 0
---------- Experiment Complete -------------

Process finished with exit code 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment