Skip to content

Instantly share code, notes, and snippets.

@esfand
Created April 26, 2014 18:53
Show Gist options
  • Save esfand/11327966 to your computer and use it in GitHub Desktop.
Save esfand/11327966 to your computer and use it in GitHub Desktop.
package com.hanhuy.android.common.concurrent;
import android.os.AsyncTask;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
/**
* @author pfnguyen
*/
public abstract class Future<V> implements java.util.concurrent.Future<V> {
public interface Callback<V> {
public void onCallback(V value);
}
public abstract void onComplete(Callback<Try<V>> fn);
public abstract void onSuccess(Callback<V> fn);
public abstract void onFailure(Callback<Throwable> fn);
public abstract <T> Future<T> map(Function<V,T> fn);
public abstract <T> Future<T> flatMap(Function<V,Future<T>> fn);
public abstract boolean isFailed();
/**
* Returns the successful value if this future is completed. Will throw
* if it is not done, or has failed.
* @throws IllegalStateException
*/
public abstract V getValue() throws IllegalStateException;
public abstract Throwable getError();
public static <T> Future<T> create(final Callable<T> callable) {
final Promise<T> promise = Promise.create();
final Runnable r = new Runnable() {
@Override
public void run() {
try {
promise.success(callable.call());
} catch (Throwable t) {
promise.failure(t);
}
}
};
AsyncTask.THREAD_POOL_EXECUTOR.execute(r);
return promise;
}
public static Future<Void> join(Future<?>... futures) {
return join(Arrays.asList(futures));
}
@SuppressWarnings("unchecked")
/**
* The result object of this Future is just some arbitrary object without
* meaning. Just use onSuccess/onFailure to determine whether or not it
* completed execution.
*
* Creates a new Future that waits for all of the list's futures to
* complete.
*/
public static Future<Void> join(List<Future<?>> list) {
// so we can cast to a bogus type and get around the typechecks
List l = list;
List<Future<Object>> l2 = (List<Future<Object>>) l;
return sequence(l2).map(new Function<List<Object>, Void>() {
@Override
public Void apply(List<Object> input) {
return null;
}
});
}
/**
* Takes a list of Future[T] and converts it into a single Future that
* returns the List[T]
*/
public static <T> Future<List<T>> sequence(List<Future<T>> list) {
final Promise<List<T>> promise = Promise.create();
final int count = list.size();
final int[] completed = {0};
final Object[] items = new Object[count];
final Object lock = new Object();
for (int i = 0; i < count; i++) {
final int j = i;
Future<T> fut = list.get(i);
fut.onComplete(new Callback<Try<T>>() {
@Override
public void onCallback(Try<T> value) {
if (value instanceof Try.Success) {
Try.Success<T> success = (Try.Success<T>) value;
synchronized(lock) {
items[j] = success.value;
completed[0]++;
if (count == completed[0]) {
promise.success((List<T>) Arrays.asList(items));
}
}
} else {
Try.Failure failure = (Try.Failure) value;
promise.failure(failure.getError());
}
}
});
}
return promise;
}
public static abstract class Try<V> {
public abstract <T> Try<T> map(Function<V,T> fn);
@SuppressWarnings("unchecked")
public static <T> Try<T> create(T value) {
return new Success(value);
}
@SuppressWarnings("unchecked")
public static <T> Try<T> create(T value, Throwable error) {
// no nullcheck because we want to support Future[Void]
return error != null ? new Failure(error) : new Success(value);
}
@SuppressWarnings("unchecked")
public static class Success<V> extends Try<V> {
private final V value;
private Success(V value) {
this.value = value;
}
public <T> Success<T> map(Function<V,T> fn) {
return new Success(fn.apply(value));
}
public V getValue() {
return value;
}
@Override
public String toString() {
return "Success: " + value;
}
}
public static class Failure<V> extends Try<V> {
private Failure(Throwable error) {
Preconditions.checkNotNull(error, "error may not be null");
this.error = error;
}
private final Throwable error;
@SuppressWarnings("unchecked")
public <T> Failure<T> map(Function<V,T> fn) {
return (Failure<T>) this;
}
public Throwable getError() {
return error;
}
@Override
public String toString() {
return "Failure: " + error;
}
}
}
}
package com.hanhuy.android.common.concurrent;
import android.os.Handler;
import android.os.Looper;
import android.util.Pair;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author pfnguyen
*/
public class Promise<V> extends Future<V> {
private static final String TAG = "Promise";
private List<Callback<Try<V>>> onCompletionQueue = Lists.newArrayList();
private List<Callback<V>> onSuccessQueue = Lists.newCopyOnWriteArrayList();
private List<Callback<Throwable>> onFailureQueue = Lists.newArrayList();
private V value;
private Throwable error;
private boolean canceled;
private boolean done;
private List<Pair<Promise<Object>,Function<Object,Object>>> mapped =
Lists.newArrayList();
private List<Pair<Promise<Object>,Function<Object,Future<Object>>>> flatMapped =
Lists.newArrayList();
private final static Handler handler = new Handler(Looper.getMainLooper());
public static <T> Promise<T> create() {
return new Promise<T>();
}
public static <T> Promise<T> create(T value) {
Promise<T> promise = Promise.create();
promise.success(value);
return promise;
}
public void success(V value) {
if (!isDone())
complete(Try.create(value));
}
public void failure(Throwable error) {
Log.v(TAG, "Promise not kept", error);
if (!isDone())
complete(Try.<V>create(null, error));
}
/**
* Sends the result to the UI thread callbacks, if any.
*/
protected synchronized void complete(final Try<V> result) {
if (result instanceof Try.Success) {
// TODO run in a background thread if we are currently on UI thread
Try.Success<V> success = (Try.Success<V>) result;
value = success.getValue();
for (Pair<Promise<Object>,Function<Object,Object>> pair : mapped) {
pair.first.success(pair.second.apply(value));
}
for (final Pair<Promise<Object>,Function<Object,Future<Object>>> pair : flatMapped) {
pair.second.apply(value).onComplete(new Callback<Try<Object>>() {
@Override
public void onCallback(Try<Object> result) {
pair.first.complete(result);
}
});
}
} else if (result instanceof Try.Failure) {
Try.Failure<V> failure = (Try.Failure<V>) result;
error = failure.getError();
for (Pair<Promise<Object>,Function<Object,Object>> pair : mapped) {
pair.first.failure(error);
}
for (Pair<Promise<Object>,Function<Object,Future<Object>>> pair : flatMapped) {
pair.first.failure(error);
}
}
done = true;
notifyAll();
mapped.clear();
flatMapped.clear();
if (Looper.getMainLooper().getThread() != Thread.currentThread()) {
handler.post(new Runnable() {
public void run() {
completeInternal(result);
}
});
}
else
completeInternal(result);
}
private synchronized void completeInternal(Try<V> result) {
for (Callback<Try<V>> cb : onCompletionQueue)
cb.onCallback(result);
if (result instanceof Try.Success) {
for (Callback<V> cb : onSuccessQueue) {
cb.onCallback(value);
}
} else if (result instanceof Try.Failure) {
for (Callback<Throwable> cb : onFailureQueue)
cb.onCallback(error);
}
onCompletionQueue.clear();
onFailureQueue.clear();
onSuccessQueue.clear();
}
@Override
public synchronized void onComplete(Callback<Try<V>> fn) {
if (done) {
fn.onCallback(Try.create(value, error));
} else {
onCompletionQueue.add(fn);
}
}
@Override
public synchronized void onSuccess(Callback<V> fn) {
if (done && error == null) {
fn.onCallback(value);
} else if (!done) {
onSuccessQueue.add(fn);
}
}
@Override
public synchronized void onFailure(Callback<Throwable> fn) {
if (done && error != null) {
fn.onCallback(error);
} else if (!done) {
onFailureQueue.add(fn);
}
}
@Override
@SuppressWarnings("unchecked")
public synchronized <T> Future<T> map(Function<V, T> fn) {
Promise<T> promise = Promise.create();
if (isDone()) {
if (error != null)
promise.failure(error);
else
promise.success(fn.apply(value));
} else {
// yeah, this needs to be unchecked...
mapped.add(new Pair(promise, fn));
}
return promise;
}
@Override
@SuppressWarnings("unchecked")
public synchronized <T> Future<T> flatMap(Function<V, Future<T>> fn) {
Promise<T> promise = Promise.create();
flatMapped.add(new Pair(promise, fn));
if (isDone()) {
if (error != null) {
promise.failure(error);
} else
return fn.apply(value);
}
return promise;
}
@Override
public synchronized boolean cancel(boolean b) {
onCompletionQueue.clear();
onSuccessQueue.clear();
onFailureQueue.clear();
for (Pair<Promise<Object>,?> pair : mapped) {
pair.first.cancel(true);
}
for (Pair<Promise<Object>,?> pair : flatMapped) {
pair.first.cancel(true);
}
mapped.clear();
flatMapped.clear();
if (!done)
canceled = true;
notifyAll();
return !done;
}
@Override
public boolean isCancelled() {
return canceled;
}
@Override
public boolean isDone() {
return done;
}
@Override
public boolean isFailed() {
return error != null;
}
@Override
public synchronized V getValue() throws IllegalStateException {
if (!isDone() || isFailed())
throw new IllegalStateException("This future is not done");
if (canceled)
throw new IllegalStateException("This future has been canceled");
return value;
}
@Override
public synchronized Throwable getError() {
if (!isDone() && !isFailed())
throw new IllegalStateException("This future did not fail");
return error;
}
/**
* Generally, get() should not be used in favor of onSuccess and onFailure
*/
@Override
public V get() throws InterruptedException, ExecutionException {
synchronized (this) {
while (!done && !canceled) {
wait();
}
}
if (error != null) throw new ExecutionException("Future failed", error);
if (canceled) throw new ExecutionException("Future canceled",
new RuntimeException());
return value;
}
/**
* Generally, get() should not be used in favor of onSuccess and onFailure
*/
@Override
public V get(long l, TimeUnit timeUnit)
throws InterruptedException, ExecutionException, TimeoutException {
synchronized (this) {
if (!done && !canceled)
wait(TimeUnit.MILLISECONDS.convert(l, timeUnit));
}
if (error != null) throw new ExecutionException("Future failed", error);
if (canceled) throw new ExecutionException("Future canceled",
new RuntimeException());
return value;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment