Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Essence of Future
/* / \____ _ _ ____ ______ / \ ____ __ _ _____
* / / \/ \ / \/ \ / /\__\/ // \/ \ / / _ \ Javaslang
* _/ / /\ \ \/ / /\ \\__\\ \ // /\ \ /\\/ \__/ / Copyright 2014-now Daniel Dietrich
* /___/\_/ \_/\____/\_/ \_/\__\/__/___\_/ \_// \__/_____/ Licensed under the Apache License, Version 2.0
*/
package javaslang.concurrent;
import javaslang.collection.Queue;
import javaslang.control.None;
import javaslang.control.Option;
import javaslang.control.Some;
import javaslang.control.Try;
import javaslang.control.Try.CheckedSupplier;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;
/**
* A Future is a computation result that becomes available at some point.
* <p>
* <strong>State</strong>
* <p>
* A Future has two states: complete and not complete.
* <p>
* If a Future is <em>not complete</em>, {@code getValue()} returns {@code None}
* and {@code onComplete()} will then enqueue the given {@code action}.
* <p>
* If a Future is <em>complete</em>, {@code getValue()} returns {@code Some(value)}
* and {@code onComplete()} will immediately perform the given {@code action}.
* <p>
* <strong>Execution</strong>
* <p>
* Both the given {@code computation} and the given {@code actions} are executed using the underlying {@code Executor}.
* If no {@code Executor} is specified, {@link ForkJoinPool#commonPool()} is used.
*
* @param <T> Type of the computation.
*/
public interface Future<T> {
static <T> Future<T> of(CheckedSupplier<T> computation) {
final FutureImpl<T> future = new FutureImpl<>(ForkJoinPool.commonPool());
future.run(computation);
return future;
}
Option<Try<T>> getValue();
boolean isComplete();
void onComplete(Consumer<? super Try<T>> action);
}
final class FutureImpl<T> implements Future<T> {
private final Executor executor;
/**
* Once the Future is completed, the value is defined and the actions are performed and set to null.
* The fields 'value' and 'actions' are guarded by 'lock'.
*/
private final Object lock = new Object();
private volatile Option<Try<T>> value = None.instance();
private Queue<Consumer<? super Try<T>>> actions = Queue.empty();
FutureImpl(Executor executor) {
this.executor = executor;
}
@Override
public Option<Try<T>> getValue() {
return value;
}
@Override
public boolean isComplete() {
return value.isDefined();
}
@Override
public void onComplete(Consumer<? super Try<T>> action) {
if (value.isDefined()) {
perform(action);
} else {
synchronized (lock) {
if (value.isDefined()) {
perform(action);
} else {
actions = actions.enqueue(action);
}
}
}
}
// Running outside to ensure 'this' is fully initialized.
// This comes handy for Promise, a writable Future.
void run(CheckedSupplier<T> computation) {
executor.execute(() -> {
final Try<T> value = Try.of(computation);
final Queue<Consumer<? super Try<T>>> actions;
synchronized (lock) {
this.value = new Some<>(value);
actions = this.actions;
this.actions = null;
}
actions.forEach(this::perform);
});
}
private void perform(Consumer<? super Try<T>> action) {
executor.execute(() -> action.accept(value.get()));
}
}
import javaslang.concurrent.Future;
import javaslang.control.Try;
public class Test {
public static void main(String[] args) {
final Future<String> future = Future.of(() -> {
Thread.sleep(1000);
return "Hello";
});
future.onComplete(System.out::println);
synchronized (future) {
while (!future.isComplete()) {
Try.run(() -> {
System.out.println("Waiting...");
Thread.sleep(500);
});
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment