Skip to content

Instantly share code, notes, and snippets.

@vthacker
Last active October 8, 2023 01:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vthacker/5d153de1349eee96683d1dd18824a7b2 to your computer and use it in GitHub Desktop.
Save vthacker/5d153de1349eee96683d1dd18824a7b2 to your computer and use it in GitHub Desktop.
Fan out queries and aggregate results. Some queries can take too long and some might throw exception.
class StructuredConcurrencyTest {
public static void main(String[] args) {
Test test = new Test();
test.query();
}
}
class Test {
public void query() {
try (CollectingScope<String> scope = new CollectingScope<>()) {
scope.fork(() -> getQuery("query1", -1));
// this query should timeout since it takes too long
scope.fork(() -> getQuery("query2", 50));
// this query is hardcoded to throw an exception
scope.fork(() -> getQuery("query3", -1));
scope.fork(() -> getQuery("query4", -1));
try {
scope.joinUntil(Instant.now().plusMillis(10));
} catch (TimeoutException e) {
System.out.println("Some queries took to long so we cancelled it");
}
// The ordering is based on whichever task completes first
String results = scope.completedSuccessfully()
.collect(Collectors.joining(", ", "{ ", " }"));
System.out.println(results);
} catch (Exception e) {
System.out.println(e);
}
}
public String getQuery(String query, long waitForMs) {
if (waitForMs > 0) {
try {
Thread.sleep(waitForMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
if ("query3".equals(query)) {
throw new IllegalArgumentException("Wrong query parameters");
}
return "result_" + query;
}
}
class CollectingScope<T> extends StructuredTaskScope<T> {
private final Queue<T> subtasks = new ConcurrentLinkedQueue<>();
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
if (subtask.state() == Subtask.State.SUCCESS) {
subtasks.add(subtask.get());
}
}
@Override
public CollectingScope<T> join() throws InterruptedException {
super.join();
return this;
}
public Stream<T> completedSuccessfully() {
// If we call ensureOwnerAndJoined we get a IllegalStateException
// The reason being on timeout lastJoinCompleted does not get updated by design
// TODO: But I can't figure out a way to get the successful results otherwise
// super.ensureOwnerAndJoined();
return subtasks.stream();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment