Skip to content

Instantly share code, notes, and snippets.

@mgladdish
Created October 10, 2014 06:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mgladdish/c4a5d3c4ff6e694b7e53 to your computer and use it in GitHub Desktop.
Save mgladdish/c4a5d3c4ff6e694b7e53 to your computer and use it in GitHub Desktop.
Concurrent Distinct Job Executor
package uk.co.odum.sky;
public class Job {
private String jobName;
public Job(String jobName) {
this.jobName = jobName;
}
public Integer execute() {
Integer sleepTime = 0;
try {
System.out.println("Starting job " + jobName);
switch (jobName) {
case "First":
sleepTime = 1000;
break;
case "Second":
sleepTime = 200;
break;
case "Third":
sleepTime = 500;
break;
default:
sleepTime = 750;
}
Thread.sleep(sleepTime);
System.out.println("Ending job " + jobName);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
return sleepTime;
}
}
package uk.co.odum.sky;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListSet;
public class JobExecutionService {
private Set alreadyRunning = new ConcurrentSkipListSet();
public CompletableFuture<Integer> execute(String jobName) {
if (alreadyRunning.contains(jobName)) throw new RuntimeException("already running!");
alreadyRunning.add(jobName);
return CompletableFuture.supplyAsync(() -> new Job(jobName).execute())
.whenComplete((r, x) -> alreadyRunning.remove(jobName));
}
}
package uk.co.odum.sky;
import org.junit.Test;
import java.util.concurrent.CompletableFuture;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
public class JobExecutionServiceTest {
@Test(expected = RuntimeException.class)
public void runningTheSameJobInParallelShouldThrowAnException() {
JobExecutionService service = new JobExecutionService();
CompletableFuture<Integer> first = service.execute("Duplicate");
CompletableFuture<Integer> again = service.execute("Duplicate");
CompletableFuture.allOf(first, again).join();
}
@Test
public void runningTheSameJobInSeriesShouldReturnTheLastValue() {
JobExecutionService service = new JobExecutionService();
CompletableFuture<Integer> series = service.execute("Series").thenCompose(x -> service.execute("Series"));
Integer value = series.join();
assertThat(series.isDone(), equalTo(true));
assertThat(value, equalTo(750));
}
@Test
public void runningDistinctJobsInParallelShouldReturnAllValues() {
JobExecutionService service = new JobExecutionService();
CompletableFuture<Integer> first = service.execute("First");
CompletableFuture<Integer> second = service.execute("Second");
CompletableFuture<Integer> third = service.execute("Third");
CompletableFuture.allOf(first, second, third).join();
assertThat("all jobs are completed", first.isDone() && second.isDone() && third.isDone(), equalTo(true));
assertThat(first.join(), equalTo(1000));
assertThat(second.join(), equalTo(200));
assertThat(third.join(), equalTo(500));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment