Skip to content

Instantly share code, notes, and snippets.

@ykrkn
Created July 18, 2016 06:15
Show Gist options
  • Save ykrkn/ccdc907b42518777c42188c101ea1864 to your computer and use it in GitHub Desktop.
Save ykrkn/ccdc907b42518777c42188c101ea1864 to your computer and use it in GitHub Desktop.
CompletableFuture Actor Graph
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.concurrent.CompletableFuture.completedFuture;
/**
* Created by sx on 26.02.16.
*/
public class Main {
public static void main(String[] args)
{
System.out.println("RUN");
Graph g = createGraphEtl(); // Don't work createGraphCycled();
List<String> src = Stream.of("A").collect(Collectors.toList());
CountDownLatch latch = new CountDownLatch(src.size());
for(String c : src){
GraphExecutor ge = new GraphExecutor(g);
ge.execute(c).thenApply((x) -> {
System.out.println("BATCH COMPLETE");
latch.countDown();
return null;
}).exceptionally(t -> {
if(t instanceof CompletionException){
((CompletionException) t).getCause().printStackTrace();
} else {
((Throwable)t).printStackTrace();
}
latch.countDown();
return null;
} );
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("EXIT");
}
private static Graph createGraphCycled() {
Graph g = new Graph();
Actor e0 = new Actor("E0");
Actor e1 = new Actor("E1");
Actor e2 = new Actor("E2");
Actor e3 = new Actor("E3");
g.add(e0);
g.add(e1);
g.add(e2);
g.add(e3);
g.connect(e0, e1);
g.connect(e1, e2);
g.connect(e2, e1);
g.connect(e2, e3);
return g;
}
private static Graph createGraphEtl(){
Graph g = new Graph();
Actor e1 = new Actor("E1");
Actor e2 = new Actor("E2");
Actor e3 = new Actor("E3");
Actor t1 = new Actor("T1");
Actor t2 = new Actor("T2");
Actor d1 = new Actor("D1");
Actor d2 = new Actor("D2");
Actor d3 = new Actor("D3");
g.add(e1);
g.add(e2);
g.add(e3);
g.add(t1);
g.add(t2);
g.add(d1);
g.add(d2);
g.add(d3);
g.connect(e1, t1);
g.connect(e2, t1);
g.connect(e3, t2);
g.connect(t1, d1);
g.connect(t2, d2);
g.connect(t2, d3);
return g;
}
private static Graph createGraphChain100(){
Graph g = new Graph();
Actor a = new Actor("A0");
// Actor first = a;
Actor p = null;
int i = 100;
while(i-->0) {
g.add(a);
if(p != null) g.connect(p, a);
p = a; a = new Actor("A"+i);
}
return g;
}
static class GraphExecutor {
private final Graph g;
private final Map<Actor, CompletableFuture> promises = new HashMap<>();
public GraphExecutor(Graph g){
this.g = g;
}
void putPromise(Actor v, CompletableFuture cf){
promises.put(v, cf);
}
CompletableFuture getPromiseFor(Actor v){
return promises.get(v);
}
private CompletableFuture execute(String c) {
for(Actor a : g.vertices) {
List<Actor> inputs = g.getInwards(a);
CompletableFuture promise;
if(inputs.size() == 0){
promise = completedFuture(c).thenComposeAsync(a::promise);
} else if (inputs.size() == 1){
Actor prev = inputs.get(0);
promise = getPromiseFor(prev).thenComposeAsync(a::promise);
} else {
// More than 1 previous - get ANY OF previous promises
List<CompletableFuture> tailPromises = inputs.stream()
.map(a2 -> getPromiseFor(a2)).collect(Collectors.toList());
promise = CompletableFuture.anyOf(tailPromises.toArray(new CompletableFuture[]{}));
promise = promise.thenComposeAsync(a::promise);
}
putPromise(a, promise);
}
List<CompletableFuture> tailPromises = g.vertices.stream()
.filter(a -> 0 == g.getOutwards(a).size())
.map(a -> getPromiseFor(a)).collect(Collectors.toList());
// Combine all tail promises into final ALL OF completed
return CompletableFuture.allOf(tailPromises.toArray(new CompletableFuture[]{}));
}
}
static class Graph {
private List<Actor> vertices = new LinkedList<>();
private List<Edge> edges = new LinkedList<>();
boolean add(Actor v){
if (!vertices.contains(v)) {
this.vertices.add(v);
return true;
}
return false;
}
Edge connect(Actor from, Actor to){
for (Edge e : edges) {
if (e.from == from && e.to == to) {
return e;
}
}
Edge e = new Edge(from, to);
edges.add(e);
return e;
}
List<Actor> getInwards(Actor to){
List<Actor> result = new ArrayList<>();
for(Edge e : edges){
if(e.to == to) result.add(e.from);
}
return result;
}
List<Actor> getOutwards(Actor from){
List<Actor> result = new ArrayList<>();
for(Edge e : edges){
if(e.from == from) result.add(e.to);
}
return result;
}
}
static class Edge {
public final Actor from;
public final Actor to;
public Edge(Actor from, Actor to) {
this.from = from;
this.to = to;
//System.out.println(from + " -> " + to);
}
}
static class Actor<T, R> {
String N;
public Actor(String name) {
N = name;
}
public CompletableFuture promise(Object x) {
return CompletableFuture.supplyAsync(() -> execute((T)x));
}
public R execute(T value) throws RuntimeException {
String z = null;
assert z != null; // WTF ???
//if(is()) throw new RuntimeException("EX");
System.out.println(N + " " + value);
try {
Thread.currentThread().sleep(111);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return (R) (value.toString() + '.');
}
boolean is(){ return true; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment