Skip to content

Instantly share code, notes, and snippets.

@ggalmazor
Last active February 16, 2019 14:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ggalmazor/1b404b31993840b9f99e047c2b7f5fb3 to your computer and use it in GitHub Desktop.
Save ggalmazor/1b404b31993840b9f99e047c2b7f5fb3 to your computer and use it in GitHub Desktop.
Stopping background jobs: a Java concurrency refactoring case

Stopping background jobs: a Java parallel code refactoring case

I spend most of my working day helping to maintain and evolve the awesome OpenDataKit tool kit, which includes ODK Briefcase, a Java desktop application (Swing) that takes care of pulling, pushing and exporting forms.

Before continuing, I feel like I should give a disclaimer. It's a long post that's served me to put my thoughts together and fix in my head what I've learned. You could read on and miss some obvious and much simpler solution to some problem. If that happens, please, please, write a comment! I'm not trying to write any foundational text. I'm just a guy figuring out how to do parallel jobs in Java, and I'm probably wrong.

The challenge

Briefcase interacts with servers that store blank forms and answered form submissions using an HTTP API to download them to a user's computer for post-processing.

Downloading one form involves many HTTP requests, especially when a form has multimedia questions, or when it's part of a big campaign with hundreds of thousands answered form submissions.

Coordinating this interaction can be challenging, and there are several aspects that need to be taken care of, such as: background threads, limitations and restrictions of the Swing UI threading model, event broadcasting, exception management and escalation, break points for job cancellation, or sizing and limiting the use of HTTP connections and local computing resources.

The need for refactoring

This section explains the ins and outs of the rationale behind refactoring this particular piece of code. Since I finally went through with it, you may as well believe me when I say it was worth the effort, and skip to the next section ;)

Some of our users use OpenDataKit to gather data in campaigns that amount to hundreds of thousands of submissions, spanning many months, and they want to be able to download them incrementally (just what's new since the last time they sync'ed data), instead of having to downloading the whole submission set each time.

The server's API has the notion of "cursors" (pointers to the last pulled submission IDs page), which Briefcase can store between session to resume a pull operation.

This new feature of "starting a download from the last submission downloaded" gave us the opportunity to dig deeper into Briefcase's code, which revealed some issues.

Issues with parallel code

The parallel code was based on Runnable, which made it impossible to make it actually return anything, and we need it to return the last processed cursor, to store it for the next time. 

This means that we either need to concurrently update some global state, or pass more stuff down so that the last couple of lines in the Runnable can store the last pulled cursor. 

Either option was not desirable because the design was already too deeply coupled and it already relied on global state side-effects, and implicit mechanics.

Threading and HTTP issues

For some reason, we were using a pool of threads to run our business logic, which in turn, used an Apache HTTP Components client that used a globally shared pool of HTTP connections. 

The goal here was to somehow fine tune Briefcase's ability to flood servers that might not be capable of dealing with a high volume of requests.

This was achieved by limiting the pool of threads to either just one thread (disable parallel downloads) or use "number of processors x 3" threads, which doesn't really address the issue.

Apart from that, the thread pool was just being used during the last part of the pull process, and several other HTTP requests were being left out. This means that different parts of the pull process were running under different threading contexts, which made error management and user feedback harder to manage.

Process cancellation solution

Complex jobs could be cancelled thanks to an object that held the "state of the current process" across all the layers. This object had to be shared across all the layers, starting from the UI, down to the last process at the lowest level of abstraction. The UI could trigger the cancellation, and low-level processes could shortcircuit checking whether the job had been cancelled or not.

This is not a bad idea. In fact, my solution has some parts that need to be aware of the "running job status" in order to shortcircuit, but still, the current implementation generated a tight coupling between the outmost layer (the UI) and the lowest possible layers, which felt like there was room for improvement in that front.

Java8 quirks & perks

First, we have to remember the main issue with parallelism in Java: cancelling a running thread is hard; almost impossible. I threw the towel on that one long ago and, from the user's perspective, there's effectively no difference in letting what's running to complete. I'll let others fight that one :)

Java8 came with a lot of great features, and today we're going to talk about the ForkJoinPool and the CompletableFuture classes. I won't delve into much detail, though, mainly because I can't :)

In any case, I'll say that we want to use keep using a pool of threads, mainly because they can be configured with a capacity, scaling criteria, and a work queue. 

The ForkJoinPool (like other thread pool implementations) extends ExecutorService, and all ExecutorService objects can be shut down, which has the following effects:

  • They let ongoing work finish
  • They ignore any queued pending work i.e. they don't launch queued work
  • They reject all new work requests

This is just what we need.

Why the ForkJoinPool

The ForkJoinPool is sized by the JVM for you. This is super nice.

The JVM spins a ForkJoinPool to run all the CompletableFuture and parallel Stream instances you create, which sounds great at first until you realize that the common ForkJoinPool (the one the JVM creates for you), won't shut down. We can create a new one, though.

Why CompletableFuture

CompletableFuture instances are composable. How nice is that?!

One way to stop depending on some shared object to stop a process is to break it down into tiny bits and use the ForkJoinPool's as an indirect cancellation mechanism. The pool has only so much computing bandwidth, and when it's saturated, new jobs get in a queue. When the pool shuts down, any queued work won't be launched, effectively cancelling the whole operation. In this approach, it's key that all the queued tasks are small.

Not all programs have a linear flow that can be modelled after a simple queue, though. We often need to launch parallel jobs and aggregate their partial results into one value before passing it down the chain. When this happens, it's not so easy to either break a process into tiny computation bits or model the workflow as a sequential, flat queue.

CompletableFuture objects can be composed to model complex workflows like the one we need to solve:

Data flow diagram of the operation of pulling one form

The CompletableFuture.allOf() method lets you take an array of CompletableFuture instances and get a new CompletableFuture that will be completed when all its component instances are complete. Since we need to provide an array, infinite (unbounded) Streams are ruled out.

This is key in our case because the process that gets the submission ID pages is unbounded: each page request tells us whether we need to ask for the next page or not.

I haven't figured out how to deal with this just using composition. Instead, I've built upon the current job cancellation system (passing a state object), while trying to make the design flatter and more explicit (see the second solution below).

The first solution

My first working implementation involved coupling my code with the ExecutorService. At this point, my goal was to just migrate the code to ForkJoinPool, CompletableFuture (and a non-globally shared HTTP client).

public final class Source {
  private static final Logger log = LoggerFactory.getLogger(Source.class);
  private final Http http;
  private final RemoteServer server;
  
  public Source(Http http, RemoteServer server) {
    this.http = http;
    this.server = server;
  }

  public ExecutorServive pull(TransferForms forms) {
    ExecutorServive executor = new ForkJoinPool(
        ForkJoinPool.commonPool().getParallelism(),
        ForkJoinPool.commonPool().getFactory(),
        (thread, throwable) -> log.error("Error pulling forms", throwable),
        ForkJoinPool.commonPool().getAsyncMode()
    );

    forms.map(form -> pullOne(http, executor, form))
        .collect(CompletableFutureHelpers.collectResult())
        .thenAcceptAsync(results -> {
          for(PullResult result : results)
            forms.setLastPullCursor(result.getForm(), result.getLastCursor());
        }, executor)
        .exceptionally(throwable -> {
          executor.shutdownNow();
          log.error("Error pulling forms", throwable);
          return null; // .exceptionally needs a Function ¯\_(ツ)_/¯
        });

    return executor;
  }

  private CompletableFuture<PullResult> pullOne(Http http, ExecutorServive executor, FormStatus form) {
    CompletableFuture<String> blankFormFuture = CompletableFuture.supplyAsync(() -> downloadBlankForm(form), executor);
    CompletableFuture<List<InstanceIdBatch>> instanceIdBatchesFuture = CompletableFuture.supplyAsync(() -> getInstanceIdBatches(form), executor);
    CompletableFuture<Void> blankFormAttachmentsFuture = CompletableFuture.runAsync(() -> downloadBlankFormAttachments(form), executor);

    return CompletableFuture.allOf(
        blankFormFuture,
        instanceIdBatchesFuture,
        blankFormAttachmentsFuture
    ).thenApplyAsync(__ -> {
      try {
        return Pair.of(
            blankFormFuture.get(),
            instanceIdBatchesFuture.get()
        );
      } catch (InterruptedException | ExecutionException e) {
        log.error("Job cancelled");
        return null;
      }
    }).thenApplyAsync(pair -> {
      List<InstanceIdBatch> batches = pair.getRight();

      // Extract all the instance IDs from all the batches and download each instance
      batches.Stream().flatMap(batch -> batch.getInstanceIds().stream()).forEach(instanceId -> {
        if (!executor.isShutdown())
          downloadSubmissionAndMedia(form, instanceId);
      });

      // Return the form and the last processed cursor for downStream processing
      return Result.of(form, getLastCursor(batches));
    }, executor);
  }
}

Key points of this implementation:

  • Users of the Source.pull() method will receive an ExecutorService, which can be shut down.
  • We create a ForkJoinPool copying the parameters from the common pool, except the error handler, which we want to implement to improve error management in our business logic (not important at this moment).
  • The CompletableFutureHelpers.collectResult() collector takes care of lifting all the results inside all the individual CompletableFuture<Result> and merging them into a CompletableFuture<List<Result>>.
  • The CompletableFuture.allOf() method returns a CompletableFuture<Void>, which forces us to do the next awkward .thenApplyAsync() to extract their values and deal with the InterruptedException.

Notice how we check the status of the executor to prevent further downloads of submissions with if (!executor.isShutdown()). This could have been solved with another nested CompletableFuture.allOf() because the submission ID list is not infinite, but I wanted to try this as a minor scale test in preparation of the changes I had to do to the block that gets all the submission ID pages.

These are the main issues with this design:

  • I have replaced passing a shared state object through all the layers with passing the ExecutorService, which is much worse. Not only it's riskier, but it's not even an object of the domain.
  • Even though all parts of the pull process use the same artifacts (parallel execution of jobs, HTTP connection, telemetry, logging, ...), the part that gets all the submission ID pages is not cancellable i.e. it doesn't check whether the executor has been shut down before continuing with the next HTTP request.

The bottom line is that we're hitting some of the issues with parallel code in Java8. Creating a CompletableFuture will immediately launch the underlying computation, and you just can choose the Executor that will run the computation.

This is looking like it's preventing us to statically declare a process in isolation of the context where it should run, but we could leverage functions for that. The only problem is that we need to make those functions somehow aware of the status of the running jobs without recurring to globally shared or passed-through objects.

The (second) solution

The solution that actually I went with involves a more crowded design:

public final class Source {
  private static final Logger log = LoggerFactory.getLogger(Source.class);
  private final Http http;
  private final RemoteServer server;

  Source(Http http, RemoteServer server) {
    this.http = http;
    this.server = server;
  }

  public static JobsRunner pull(TransferForms forms) {
    return new JobsRunner<PullResult>()
        .onError(e -> log.error("Error pulling forms", e))
        .onSuccess(results -> {
          for(PullResult result : results)
            forms.setLastPullCursor(result.getForm(), result.getLastCursor());
        })
        .launch(forms.map(Source::pullOne));
  }

  private static Job<PullResult> pullOne(FormStatus form) {
    return Job.allOf(
        Job.supply(runnerStatus -> downloadBlankForm(form)),
        Job.supply(runnerStatus -> getInstanceIdBatches(form, runnerStatus)),
        Job.run(runnerStatus -> downloadBlankFormAttachments(form))
    ).thenApply((runnerStatus, t) -> {
      SubmissionKeyGenerator subKeyGen = SubmissionKeyGenerator.from(t.get1());

      t.get2().Stream()
          .flatMap(batch -> batch.getInstanceIds().Stream())
          .forEach(instanceId -> {
            if (runnerStatus.isStillRunning())
              downloadSubmissionAndMedia(form, instanceId, subKeyGen);
          });

      return PullResult.of(form, getLastCursor(t.get2()));
    });
  }
}

public class Job<T> {
  private final Function<RunnerStatus, T> runnerAwareSupplier;

  private Job(Function<RunnerStatus, T> runnerAwareSupplier) {
    this.runnerAwareSupplier = runnerAwareSupplier;
  }

  public static <U> Job<U> supply(Function<RunnerStatus, U> runnerAwareSupplier) {
    return new Job<>(runnerAwareSupplier);
  }

  public static Job<Void> run(Consumer<RunnerStatus> runnerAwareRunnable) {
    return new Job<>(runnerStatus -> {
      runnerAwareRunnable.accept(runnerStatus);
      return null;
    });
  }

  public static <T, U, V> Job<Triple<T, U, V>> allOf(Job<T> t, Job<U> u, Job<V> v) {
    return new Job<>(runnerStatus -> new Triple<>(
        t.runnerAwareSupplier.apply(runnerStatus),
        u.runnerAwareSupplier.apply(runnerStatus),
        v.runnerAwareSupplier.apply(runnerStatus)
    ));
  }

  public <U> Job<U> thenApply(BiFunction<RunnerStatus, T, U> runnerAwareFunction) {
    return new Job<>(runnerStatus -> runnerAwareFunction.apply(runnerStatus, runnerAwareSupplier.apply(runnerStatus)));
  }

  CompletableFuture<T> launch(ExecutorService executor) {
    return CompletableFuture.supplyAsync(() -> runnerAwareSupplier.apply(new RunnerStatus(executor::isShutdown)), executor);
  }
}

public class JobsRunner<T> {
  private static final Logger log = LoggerFactory.getLogger(JobsRunner.class);
  private final List<Consumer<List<T>>> successCallbacks = new ArrayList<>();
  private final List<Consumer<Throwable>> errorCallbacks = new ArrayList<>();
  private final ExecutorService executor;

  public JobsRunner() {
    executor = new ForkJoinPool(
        ForkJoinPool.commonPool().getParallelism(),
        ForkJoinPool.commonPool().getFactory(),
        (thread, throwable) -> errorCallbacks.forEach(c -> c.accept(throwable)),
        ForkJoinPool.commonPool().getAsyncMode()
    );
  }

  public JobsRunner<T> onError(Consumer<Throwable> errorCallback) {
    errorCallbacks.add(errorCallback);
    return this;
  }

  public JobsRunner<T> onSuccess(Consumer<List<T>> successCallback) {
    successCallbacks.add(successCallback);
    return this;
  }

  public JobsRunner<T> launch(Stream<Job<T>> jobs) {
    CompletableFuture.runAsync(() -> {
      try {
        List<T> results = jobs
            .map(job -> job.launch(executor))
            .collect(CompletableFutureHelpers.collectResult())
            .get();
        successCallbacks.forEach(c -> c.accept(results));
        executor.shutdown();
      } catch (InterruptedException | ExecutionException e) {
        log.info("Job cancelled", e);
      }
    }, executor);
    return this;
  }

  public void cancel() {
    executor.shutdownNow();
  }
}

public class RunnerStatus {
  private final Supplier<Boolean> stoppedProbe;

  RunnerStatus(Supplier<Boolean> stoppedProbe) {
    this.stoppedProbe = stoppedProbe;
  }

  public boolean isStillRunning() {
    return !stoppedProbe.get();
  }
}

public final class CompletableFutureHelpers {
  static <X, T extends CompletableFuture<X>> Collector<T, ?, CompletableFuture<List<X>>> collectResult() {
    return Collectors.collectingAndThen(
        Collectors.toList(),
        ts -> CompletableFuture
            .allOf(ts.toArray(new CompletableFuture[0]))
            .thenApply(v -> ts.Stream().map(CompletableFuture::join).collect(Collectors.toList()))
    );
  }
}

The key points are:

  • At a high-level of abstraction, the Source.pull() method is the only one that knows about the context where the process will run. It declares a Stream of jobs, which gets launched with a JobsRunner.
  • The Source.pullOne() method doesn't know about the context where it will be run. For what is worth, it could be run even in the main thread.
  • The only coupling point is that all the functions that compose the pull operation are aware of the status of the runner they're being executed. The improvement here is that we've made it explicit thanks to the types involved, and that now the computation pieces are much smaller, easier to follow and understand.
  • Each Job will generate a fresh RunnerStatus for the supplier they're wrapping using the executor that they receive when they're launched.
  • We can compose three heterogeneous jobs into a job holding the product of the their three corresponding types. This could be extended to any other amount of jobs.
  • The JobsRunner knows how to launch a list of jobs objects in parallel.

This implementation, although arguably more crowded, is flatter, more explicit, takes care of all the main issues from the original code, and can be (and will be) reused in other parts of Briefcase, which has been a longstanding task for quite some time.

I'm generally happy with it, but there are some things that I would like to eventually improve:

  • I don't like the RunnerStatus object. I'd love to be able to use regular functions, but I don't see how to work around the iterative download of submission ID batches yet.

Next steps

This design is quite limited because we can't statically model a job that spreads over n jobs and merges their results into one, or define complex async patterns like a race of jobs.

Currently, the composed jobs are opaque in the sense that the runner doesn't know about their internal structure and execution needs.

The design should probably have to evolve into a light DSL capable of defining statically these missing features, and of producing a non-opaque composed job that the runner would be able to expand and glue together by providing the right execution context.

Conclusion

It's paradoxical how I feel I've learned a lot about parallel code in Java8 and I know there's so much left to learn yet. I've written this post as an exercise to put my thoughts in order after some very intense days working with this codebase, and hoping that I can get some feedback from any courageous reader that might reach these lines.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment