Skip to content

Instantly share code, notes, and snippets.

@ykrkn
Created July 15, 2016 13:23
Show Gist options
  • Save ykrkn/a0f1f004faff48340719debc4a62a246 to your computer and use it in GitHub Desktop.
Save ykrkn/a0f1f004faff48340719debc4a62a246 to your computer and use it in GitHub Desktop.
CompletableFuture Actor chain
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import static java.util.concurrent.CompletableFuture.supplyAsync;
/**
* Created by sx on 26.02.16.
*/
public class Main {
static AtomicInteger cnt = new AtomicInteger();
static ExecutorService executor;
volatile static boolean RUNNING = true;
public static void main(String[] args) throws InterruptedException {
System.out.println("RUN");
executor = Executors.newFixedThreadPool(1);
executor.submit(() -> new Main().run()) ;
executor.shutdown();
}
Set<Wire> patchbay = Collections.synchronizedSet(new LinkedHashSet<>());
CountDownLatch latch = new CountDownLatch(1);
public void run(){
Stream.of("A", "B", "C", "D", "E", "F", "G", "H").parallel()
.forEach(Main.this::runFrameTask);
//System.out.println("BATCH COMPLETE");
//executor.shutdown();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("BATCH EXIT");
}
private void runFrameTask(String c) {
List<Actor> actors = new ArrayList<>();
int i = 100; while(i-->0) actors.add(new Actor());
CompletableFuture<String> CF = supplyAsync(() -> c);
CompletableFuture<String> CF_ = CF;
cnt.incrementAndGet();
for(Iterator<Actor> iter = actors.iterator(); iter.hasNext();){
Actor a = iter.next();
CF_ = CF_.thenComposeAsync(x -> supplyAsync(() -> a.execute(x)));
}
CF_.thenAccept(x -> Main.this.onFrameComplete(x))
.exceptionally(t -> {
t.printStackTrace();
cnt.decrementAndGet();
return null;
} );
//String s = CF.join();
//System.out.println(s);
}
private void onFrameComplete(String x) {
System.out.println("FRAME COMPLETE " + x);
if(cnt.decrementAndGet() == 0){
System.out.println("BATCH COMPLETE");
latch.countDown();
}
}
static class Wire {
public final Actor from;
public final Actor to;
public Wire(Actor from, Actor to) {
this.from = from;
this.to = to;
}
}
static class Actor {
public String execute(String value) {
String z = null;
assert z != null;
System.out.println(value);
try {
Thread.currentThread().sleep(11);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return value + '.';
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment