Skip to content

Instantly share code, notes, and snippets.

@ggalmazor
Last active February 8, 2019 17:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ggalmazor/b7394c7e8dd95d5d943ed9c1aa63d224 to your computer and use it in GitHub Desktop.
Save ggalmazor/b7394c7e8dd95d5d943ed9c1aa63d224 to your computer and use it in GitHub Desktop.
public class Job<T> {
private final JobAwareSupplier<T> block;
private Job(JobAwareSupplier<T> block) {
this.block = block;
}
public static <U> Job<U> supply(JobAwareSupplier<U> supplier) {
return new Job<>(supplier);
}
public static Job<Void> run(JobAwareRunnable runnable) {
return new Job<>(jobStatus -> {
runnable.run(jobStatus);
return null;
});
}
public static <T, U, V> Job<Triple<T, U, V>> allOf(Job<T> t, Job<U> u, Job<V> v) {
return new Job<>(jobStatus -> new Triple<>(
t.block.get(jobStatus),
u.block.get(jobStatus),
v.block.get(jobStatus)
));
}
public <U> Job<U> thenApply(JobAwareFunction<T, U> function) {
return new Job<>(jobStatus -> function.apply(jobStatus, block.get(jobStatus)));
}
public CompletableFuture<T> launch(ExecutorService executor) {
return CompletableFuture.supplyAsync(() -> block.get(new JobStatus(executor::isShutdown)), executor);
}
}
@FunctionalInterface
public interface JobAwareFunction<T, U> {
U apply(JobStatus jobStatus, T t);
}
@FunctionalInterface
public interface JobAwareRunnable {
void run(JobStatus jobStatus);
}
@FunctionalInterface
public interface JobAwareSupplier<T> {
T get(JobStatus jobStatus);
}
public class JobsRunner<T> {
private static final Logger log = LoggerFactory.getLogger(JobsRunner.class);
private final List<Consumer<List<T>>> successCallbacks = new ArrayList<>();
private final List<Consumer<Throwable>> errorCallbacks = new ArrayList<>();
private final ExecutorService executor;
private final List<Job<T>> jobs;
private JobsRunner(List<Job<T>> jobs) {
this.jobs = jobs;
executor = new ForkJoinPool(
ForkJoinPool.commonPool().getParallelism(),
ForkJoinPool.commonPool().getFactory(),
(thread, throwable) -> errorCallbacks.forEach(c -> c.accept(throwable)),
ForkJoinPool.commonPool().getAsyncMode()
);
}
@SafeVarargs
public static <T> JobsRunner<T> of(Job<T>... jobs) {
return JobsRunner.of(Stream.of(jobs));
}
public static <T> JobsRunner<T> of(Stream<Job<T>> jobs) {
return JobsRunner.of(jobs.collect(toList()));
}
public static <T> JobsRunner<T> of(List<Job<T>> jobs1) {
return new JobsRunner<>(jobs1);
}
public JobsRunner<T> onError(Consumer<Throwable> errorCallback) {
errorCallbacks.add(errorCallback);
return this;
}
public JobsRunner<T> onSuccess(Consumer<List<T>> successCallback) {
successCallbacks.add(successCallback);
return this;
}
public JobsRunner<T> launch() {
CompletableFuture.runAsync(() -> {
try {
List<T> results = jobs.stream().map(job -> job.launch(executor)).collect(collectResult()).get();
successCallbacks.forEach(c -> c.accept(results));
} catch (InterruptedException | ExecutionException e) {
log.info("Job cancelled", e);
}
}, executor);
return this;
}
public void cancel() {
executor.shutdownNow();
}
}
public class JobStatus extends TerminationFuture {
private final Supplier<Boolean> isCancelledGetter;
JobStatus(Supplier<Boolean> isCancelledGetter) {
this.isCancelledGetter = isCancelledGetter;
}
public boolean notCancelled() {
return !isCancelledGetter.get();
}
@Override
public boolean isCancelled() {
return isCancelledGetter.get();
}
}
public JobsRunner pull(TransferForms forms) {
Http reusableHttp = http.reusingConnections();
Stream<Job<PullResult>> pullJobs = forms.getSelectedForms().stream()
.map(form -> PullForm.pullOne(reusableHttp, form));
return JobsRunner.of(pullJobs)
.onError(e -> {
log.error("Error pulling forms", e);
EventBus.publish(new PullEvent.Failure());
})
.onSuccess(results -> {
results.forEach(result -> forms.setLastPullCursor(result.getForm(), result.getLastCursor()));
EventBus.publish(new PullEvent.Success());
})
.launch();
}
private Job<PullResult> pullOne(Http http, FormStatus form) {
PullTracker tracker = new PullTracker(form);
return allOf(
supply(jobStatus -> downloadBlankForm(form, tracker)),
supply(jobStatus -> getInstanceIdBatches(form, tracker, jobStatus)),
run(jobStatus -> downloadBlankFormAttachments(form, tracker))
).thenApply((jobStatus, t) -> {
// Build the submission key generator with the blank form XML
SubmissionKeyGenerator subKeyGen = SubmissionKeyGenerator.from(t.get1());
// Extract all the instance IDs from all the batches and download each instance
t.get2().stream()
.flatMap(batch -> batch.getInstanceIds().stream())
.forEach(instanceId -> {
if (jobStatus.notCancelled())
downloadSubmissionAndMedia(form, tracker, instanceId, subKeyGen);
});
// Return the pull result with the last cursor
return PullResult.of(form, getLastCursor(t.get2()));
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment