Skip to content

Instantly share code, notes, and snippets.

@ggalmazor
Last active February 8, 2019 16:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ggalmazor/ec1a7e580e8ae9755fc56a3db0ba98ef to your computer and use it in GitHub Desktop.
Save ggalmazor/ec1a7e580e8ae9755fc56a3db0ba98ef to your computer and use it in GitHub Desktop.
Medium - Java Concurrency - Block that launches the pull process
public ExecutorService pull(List<FormStatus> forms) {
ExecutorService executor = new ForkJoinPool(
commonPool().getParallelism(),
commonPool().getFactory(),
this::handleError,
commonPool().getAsyncMode()
);
forms.stream()
.map(form -> pullOne(form, executor))
.collect(collectResult()) // List<CompletableFuture<T>> -> CompletableFuture<List<T>>
.thenAcceptAsync(this::handleSuccess, executor)
.exceptionally(throwable -> {
executor.shutdownNow();
this.handleError(throwable);
return null; // .exceptionally needs a Function ¯\_(ツ)_/¯
});
return executor;
}
private CompletableFuture<Result> pullOne(FormStatus form, Http http, ExecutorService executor) {
PullTracker tracker = new PullTracker(form);
CompletableFuture<String> blankFormFuture = CompletableFuture.supplyAsync(() -> downloadBlankForm(form, tracker, http), executor);
CompletableFuture<List<InstanceIdBatch>> instanceIdBatchesFuture = CompletableFuture.supplyAsync(() -> getInstanceIdBatches(form, tracker, http), executor);
CompletableFuture<Void> blankFormAttachmentsFuture = CompletableFuture.runAsync(() -> downloadBlankFormAttachments(form, tracker, http), executor);
return CompletableFuture.allOf(
blankFormFuture,
instanceIdBatchesFuture,
blankFormAttachmentsFuture
).thenApplyAsync(__ -> {
try {
return Pair.of(
blankFormFuture.get(),
instanceIdBatchesFuture.get()
);
} catch (InterruptedException | ExecutionException e) {
log.error("Job cancelled");
return null;
}
}).thenApplyAsync(pair -> {
List<InstanceIdBatch> batches = pair.getRight();
// Extract all the instance IDs from all the batches and download each instance
batches.stream().flatMap(batch -> batch.getInstanceIds().stream()).forEach(instanceId -> {
if (!executor.isShutdown())
downloadSubmissionAndMedia(form, tracker, instanceId, http);
});
// Return the form and the last processed cursor for downstream processing
return Result.of(form, getLastCursor(batches));
}, executor);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment