Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save krmahadevan/df96ffb180cb92f670b38d531f5a8949 to your computer and use it in GitHub Desktop.
Save krmahadevan/df96ffb180cb92f670b38d531f5a8949 to your computer and use it in GitHub Desktop.
Sample that demonstrates how to use CompletableFuture along with callbacks for listening to task completions which uses a ThreadPoolExecutor which is backed by a PriorityBlockingQueue
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.*;
//Works
public class CompletableFuturePriorityQueueExample {
static class Task implements Comparable<Task> {
private final String name;
private final int priority;
public Task(String name, int priority) {
this.name = name;
this.priority = priority;
}
public String getName() {
return name;
}
public int getPriority() {
return priority;
}
@Override
public int compareTo(Task other) {
return Integer.compare(this.priority, other.priority);
}
}
static class MyRunnable implements Runnable, Comparable<MyRunnable> {
private final Task task;
public MyRunnable(Task task) {
this.task = task;
}
@Override
public void run() {
System.out.println("Processing task: " + task.getName() + " with priority " + task.getPriority()
+ " in thread: " + Thread.currentThread().getId());
}
public Task getTask() {
return task;
}
@Override
public int compareTo(MyRunnable o) {
return task.compareTo(o.task);
}
}
private static Runnable extract(Object object) {
try {
Field field = object.getClass().getDeclaredField("fn");
field.setAccessible(true);
return (Runnable) field.get(object);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
// Create a custom executor that executes tasks in the calling thread
Executor sameThreadExecutor = Runnable::run;
// Create a priority blocking queue with a custom comparator
PriorityBlockingQueue<Runnable> priorityQueue = new PriorityBlockingQueue<>(10, Comparator.comparing(it -> ((MyRunnable) extract(it)).getTask()));
// Create a thread pool executor with the priority queue
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, priorityQueue);
// Create a list to store CompletableFuture instances
List<CompletableFuture<Void>> futures = new ArrayList<>();
// Enqueue tasks with different priorities
List<Task> tasks = List.of(
new Task("Task1", 3),
new Task("Task2", 1),
new Task("Task3", 2)
);
// Create CompletableFuture and associate runAsync with each task
for (Task task : tasks) {
CompletableFuture<Void> future = CompletableFuture.runAsync(new MyRunnable(task), executor);
// Attach a whenCompleteAsync callback for cleanup with the same executor
future.whenCompleteAsync((result, throwable) -> {
if (throwable == null) {
// Simulate cleanup logic
System.out.println("Cleanup completed for task: " + task.getName() + " with priority " + task.getPriority()
+ " in thread: " + Thread.currentThread().getId());
} else {
// Handle exceptions if any
throwable.printStackTrace();
}
}, sameThreadExecutor);
// Add the CompletableFuture to the list
futures.add(future);
}
// Wait for all CompletableFuture tasks to complete
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allOf.join();
// Shut down the executor
executor.shutdown();
}
}
@krmahadevan
Copy link
Author

This was a result of interacting with ChatGPT. For the full conversation check here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment