Skip to content

Instantly share code, notes, and snippets.

@honwhy
Created December 31, 2022 16:12
Show Gist options
  • Save honwhy/ac18c916319800de6ac819bbad0bd8dc to your computer and use it in GitHub Desktop.
Save honwhy/ac18c916319800de6ac819bbad0bd8dc to your computer and use it in GitHub Desktop.
Java 8 CompletableFuture Suggestions
package org.example;
import java.util.*;
import java.util.concurrent.*;
/**
* Hello world!
*/
public class App {
static BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(200);
static ThreadPoolExecutor threadPoolExecutor = new InterruptableThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS, blockingQueue);
static Object PRESENT = new Object();
public static void main(String[] args) {
for (int j = 1; j <= 10; j++) {
play(j);
}
threadPoolExecutor.shutdown();
}
/**
* as like call 10 http request
*/
static void play(int j) {
Random random = new Random();
Map<Integer, Object> map = new ConcurrentHashMap<>();
List<CompletableFuture<Void>> all = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
int finalI = i;
System.out.printf("%d round %d step enter running\n", j, finalI);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
if (random.nextDouble() < 0.5) {
map.put(finalI, PRESENT);
} else {
try {
TimeUnit.MILLISECONDS.sleep(1_500 + Math.round(random.nextDouble() * 500));
System.err.printf("%d round %d step, finish sleep\n", j, finalI);
} catch (InterruptedException ignore) {
//
}
}
return null;
}, threadPoolExecutor);
all.add(future);
}
try {
CompletableFuture.allOf(all.toArray(new CompletableFuture<?>[0])).get(1000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
System.out.println(j + " round, before clean: " + threadPoolExecutor.getQueue().size());
all.forEach(f -> {
if (!f.isDone() && !f.isCancelled() && !f.isCompletedExceptionally()) {
f.cancel(false);
}
});
System.out.println(j + " round, after clean: " + threadPoolExecutor.getQueue().size());
}
Set<Integer> set = map.keySet();
System.out.println(j + " round, play result: " + set);
}
static class InterruptableThreadPoolExecutor extends ThreadPoolExecutor {
static ScheduledExecutorService scheduledThreadPoolExecutor = Executors.newScheduledThreadPool(10);
static Map<Runnable, ScheduledFuture> workerSet = new ConcurrentHashMap<>();
public InterruptableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
ScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.schedule(() -> {
t.interrupt();
}, 1000, TimeUnit.MILLISECONDS);
workerSet.put(r, scheduledFuture);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
ScheduledFuture scheduledFuture = workerSet.remove(r);
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
}
}
@Override
public void shutdown() {
scheduledThreadPoolExecutor.shutdownNow();
super.shutdown();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment