Skip to content

Instantly share code, notes, and snippets.

@n1chre
Last active January 8, 2017 16:46
Show Gist options
  • Save n1chre/f88cb542776b3500e0ffd7097d3109dc to your computer and use it in GitHub Desktop.
Save n1chre/f88cb542776b3500e0ffd7097d3109dc to your computer and use it in GitHub Desktop.
Running of parallel tasks that can be terminated if an acceptable value is produced.
package parallel;
import java.util.Collection;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This class is used to execute tasks in parallel.
* If some task returns an acceptable result, any other computation is discarded.
* If none of them return an acceptable result, default value is returned.
*/
public abstract class Parallel<T> implements Observer {
/**
* Parallel AND computation of boolean computations.
* If any call produces a false, false is returned and all other calls are stopped.
*/
public static Parallel<Boolean> PARALLEL_AND() {
return new Parallel<Boolean>() {
@Override
protected boolean isAcceptable(Boolean b) {
return !b;
}
@Override
protected Boolean defaultValue() {
return true;
}
};
}
/**
* Parallel OR computation of boolean computations.
* If any call produces a true, true is returned and all other calls are stopped.
*/
public static Parallel<Boolean> PARALLEL_OR() {
return new Parallel<Boolean>() {
@Override
protected boolean isAcceptable(Boolean b) {
return b;
}
@Override
protected Boolean defaultValue() {
return false;
}
};
}
/**
* Task counter. When it goes down to zero, all tasks have finished
* and we return the default value
*/
private AtomicInteger ai;
private final ExecutorService executor;
private CompletableFuture<T> future;
/**
* Create a new parallel object.
*/
public Parallel() {
this.executor = Executors.newCachedThreadPool();
this.ai = new AtomicInteger(0);
}
/**
* @param t value to test
* @return true if it is an acceptable value, it will be returned
*/
protected abstract boolean isAcceptable(T t);
/**
* @return default value that is returned if no task produces an acceptable value
*/
protected abstract T defaultValue();
/**
* Override this method if you want to examine all produced values.
*
* @param t value
*/
protected void onEach(T t) {
}
/**
* Run all tasks in parallel
*
* @param callables these will be called in parallel
*/
public Future<T> run(Collection<Callable<T>> callables) {
if (ai.get() != 0) {
throw new RuntimeException("Already running");
}
ai.set(callables.size());
callables.parallelStream().map(Task::new).forEach(executor::submit);
executor.shutdown();
future = new CompletableFuture<>();
return future;
}
@Override
@SuppressWarnings("unchecked")
public void update(Observable o, Object arg) {
T t = (T) arg;
onEach(t);
boolean acceptable = isAcceptable(t);
int curr = ai.decrementAndGet();
if (!acceptable) {
if (curr <= 0) {
onFinish(defaultValue());
}
return;
}
onFinish(t);
}
/**
* Kills all other computations and notifies observers that it is done.
*
* @param t value to return
*/
private void onFinish(T t) {
ai.set(0);
executor.shutdownNow();
future.complete(t);
}
/**
* Implementation of a task
*/
private class Task extends Observable implements Runnable {
/**
* Callable that is called
*/
private Callable<T> callable;
/**
* Create a new task with given callable, add Parallel as an observer
*
* @param callable callable
*/
Task(Callable<T> callable) {
this.callable = callable;
addObserver(Parallel.this);
}
@Override
public void run() {
try {
T result = callable.call();
setChanged();
notifyObservers(result);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
/* * * * * * * * * * * * * * * * * * * * * * * * * *
* *
* EXAMPLE *
* *
* * * * * * * * * * * * * * * * * * * * * * * * * */
// Each task sleeps for a given number of seconds and then returns that number.
private static Parallel<Integer> PAR_INT = new Parallel<Integer>() {
@Override
protected boolean isAcceptable(Integer arg) {
return arg == 3;
}
@Override
protected void onEach(Integer arg) {
System.out.println("Produced -> " + arg);
}
@Override
protected Integer defaultValue() {
return 42;
}
};
private static Callable<Integer> create(int x) {
return () -> {
TimeUnit.SECONDS.sleep(x);
return x;
};
}
public static void main(String[] args) {
Future<Integer> f = PAR_INT.run(Arrays.asList(create(1), create(3), create(5), create(2)));
try {
Integer i = f.get();
System.out.println("Final value -> " + i);
} catch (InterruptedException | ExecutionException ex) {
System.err.println("Error: " + ex.getLocalizedMessage());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment