Created
April 26, 2014 18:53
-
-
Save esfand/11327966 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.hanhuy.android.common.concurrent; | |
import android.os.AsyncTask; | |
import com.google.common.base.Function; | |
import com.google.common.base.Preconditions; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.concurrent.Callable; | |
/** | |
* @author pfnguyen | |
*/ | |
public abstract class Future<V> implements java.util.concurrent.Future<V> { | |
public interface Callback<V> { | |
public void onCallback(V value); | |
} | |
public abstract void onComplete(Callback<Try<V>> fn); | |
public abstract void onSuccess(Callback<V> fn); | |
public abstract void onFailure(Callback<Throwable> fn); | |
public abstract <T> Future<T> map(Function<V,T> fn); | |
public abstract <T> Future<T> flatMap(Function<V,Future<T>> fn); | |
public abstract boolean isFailed(); | |
/** | |
* Returns the successful value if this future is completed. Will throw | |
* if it is not done, or has failed. | |
* @throws IllegalStateException | |
*/ | |
public abstract V getValue() throws IllegalStateException; | |
public abstract Throwable getError(); | |
public static <T> Future<T> create(final Callable<T> callable) { | |
final Promise<T> promise = Promise.create(); | |
final Runnable r = new Runnable() { | |
@Override | |
public void run() { | |
try { | |
promise.success(callable.call()); | |
} catch (Throwable t) { | |
promise.failure(t); | |
} | |
} | |
}; | |
AsyncTask.THREAD_POOL_EXECUTOR.execute(r); | |
return promise; | |
} | |
public static Future<Void> join(Future<?>... futures) { | |
return join(Arrays.asList(futures)); | |
} | |
@SuppressWarnings("unchecked") | |
/** | |
* The result object of this Future is just some arbitrary object without | |
* meaning. Just use onSuccess/onFailure to determine whether or not it | |
* completed execution. | |
* | |
* Creates a new Future that waits for all of the list's futures to | |
* complete. | |
*/ | |
public static Future<Void> join(List<Future<?>> list) { | |
// so we can cast to a bogus type and get around the typechecks | |
List l = list; | |
List<Future<Object>> l2 = (List<Future<Object>>) l; | |
return sequence(l2).map(new Function<List<Object>, Void>() { | |
@Override | |
public Void apply(List<Object> input) { | |
return null; | |
} | |
}); | |
} | |
/** | |
* Takes a list of Future[T] and converts it into a single Future that | |
* returns the List[T] | |
*/ | |
public static <T> Future<List<T>> sequence(List<Future<T>> list) { | |
final Promise<List<T>> promise = Promise.create(); | |
final int count = list.size(); | |
final int[] completed = {0}; | |
final Object[] items = new Object[count]; | |
final Object lock = new Object(); | |
for (int i = 0; i < count; i++) { | |
final int j = i; | |
Future<T> fut = list.get(i); | |
fut.onComplete(new Callback<Try<T>>() { | |
@Override | |
public void onCallback(Try<T> value) { | |
if (value instanceof Try.Success) { | |
Try.Success<T> success = (Try.Success<T>) value; | |
synchronized(lock) { | |
items[j] = success.value; | |
completed[0]++; | |
if (count == completed[0]) { | |
promise.success((List<T>) Arrays.asList(items)); | |
} | |
} | |
} else { | |
Try.Failure failure = (Try.Failure) value; | |
promise.failure(failure.getError()); | |
} | |
} | |
}); | |
} | |
return promise; | |
} | |
public static abstract class Try<V> { | |
public abstract <T> Try<T> map(Function<V,T> fn); | |
@SuppressWarnings("unchecked") | |
public static <T> Try<T> create(T value) { | |
return new Success(value); | |
} | |
@SuppressWarnings("unchecked") | |
public static <T> Try<T> create(T value, Throwable error) { | |
// no nullcheck because we want to support Future[Void] | |
return error != null ? new Failure(error) : new Success(value); | |
} | |
@SuppressWarnings("unchecked") | |
public static class Success<V> extends Try<V> { | |
private final V value; | |
private Success(V value) { | |
this.value = value; | |
} | |
public <T> Success<T> map(Function<V,T> fn) { | |
return new Success(fn.apply(value)); | |
} | |
public V getValue() { | |
return value; | |
} | |
@Override | |
public String toString() { | |
return "Success: " + value; | |
} | |
} | |
public static class Failure<V> extends Try<V> { | |
private Failure(Throwable error) { | |
Preconditions.checkNotNull(error, "error may not be null"); | |
this.error = error; | |
} | |
private final Throwable error; | |
@SuppressWarnings("unchecked") | |
public <T> Failure<T> map(Function<V,T> fn) { | |
return (Failure<T>) this; | |
} | |
public Throwable getError() { | |
return error; | |
} | |
@Override | |
public String toString() { | |
return "Failure: " + error; | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.hanhuy.android.common.concurrent; | |
import android.os.Handler; | |
import android.os.Looper; | |
import android.util.Pair; | |
import com.google.common.base.Function; | |
import com.google.common.collect.Lists; | |
import java.util.List; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.TimeoutException; | |
/** | |
* @author pfnguyen | |
*/ | |
public class Promise<V> extends Future<V> { | |
private static final String TAG = "Promise"; | |
private List<Callback<Try<V>>> onCompletionQueue = Lists.newArrayList(); | |
private List<Callback<V>> onSuccessQueue = Lists.newCopyOnWriteArrayList(); | |
private List<Callback<Throwable>> onFailureQueue = Lists.newArrayList(); | |
private V value; | |
private Throwable error; | |
private boolean canceled; | |
private boolean done; | |
private List<Pair<Promise<Object>,Function<Object,Object>>> mapped = | |
Lists.newArrayList(); | |
private List<Pair<Promise<Object>,Function<Object,Future<Object>>>> flatMapped = | |
Lists.newArrayList(); | |
private final static Handler handler = new Handler(Looper.getMainLooper()); | |
public static <T> Promise<T> create() { | |
return new Promise<T>(); | |
} | |
public static <T> Promise<T> create(T value) { | |
Promise<T> promise = Promise.create(); | |
promise.success(value); | |
return promise; | |
} | |
public void success(V value) { | |
if (!isDone()) | |
complete(Try.create(value)); | |
} | |
public void failure(Throwable error) { | |
Log.v(TAG, "Promise not kept", error); | |
if (!isDone()) | |
complete(Try.<V>create(null, error)); | |
} | |
/** | |
* Sends the result to the UI thread callbacks, if any. | |
*/ | |
protected synchronized void complete(final Try<V> result) { | |
if (result instanceof Try.Success) { | |
// TODO run in a background thread if we are currently on UI thread | |
Try.Success<V> success = (Try.Success<V>) result; | |
value = success.getValue(); | |
for (Pair<Promise<Object>,Function<Object,Object>> pair : mapped) { | |
pair.first.success(pair.second.apply(value)); | |
} | |
for (final Pair<Promise<Object>,Function<Object,Future<Object>>> pair : flatMapped) { | |
pair.second.apply(value).onComplete(new Callback<Try<Object>>() { | |
@Override | |
public void onCallback(Try<Object> result) { | |
pair.first.complete(result); | |
} | |
}); | |
} | |
} else if (result instanceof Try.Failure) { | |
Try.Failure<V> failure = (Try.Failure<V>) result; | |
error = failure.getError(); | |
for (Pair<Promise<Object>,Function<Object,Object>> pair : mapped) { | |
pair.first.failure(error); | |
} | |
for (Pair<Promise<Object>,Function<Object,Future<Object>>> pair : flatMapped) { | |
pair.first.failure(error); | |
} | |
} | |
done = true; | |
notifyAll(); | |
mapped.clear(); | |
flatMapped.clear(); | |
if (Looper.getMainLooper().getThread() != Thread.currentThread()) { | |
handler.post(new Runnable() { | |
public void run() { | |
completeInternal(result); | |
} | |
}); | |
} | |
else | |
completeInternal(result); | |
} | |
private synchronized void completeInternal(Try<V> result) { | |
for (Callback<Try<V>> cb : onCompletionQueue) | |
cb.onCallback(result); | |
if (result instanceof Try.Success) { | |
for (Callback<V> cb : onSuccessQueue) { | |
cb.onCallback(value); | |
} | |
} else if (result instanceof Try.Failure) { | |
for (Callback<Throwable> cb : onFailureQueue) | |
cb.onCallback(error); | |
} | |
onCompletionQueue.clear(); | |
onFailureQueue.clear(); | |
onSuccessQueue.clear(); | |
} | |
@Override | |
public synchronized void onComplete(Callback<Try<V>> fn) { | |
if (done) { | |
fn.onCallback(Try.create(value, error)); | |
} else { | |
onCompletionQueue.add(fn); | |
} | |
} | |
@Override | |
public synchronized void onSuccess(Callback<V> fn) { | |
if (done && error == null) { | |
fn.onCallback(value); | |
} else if (!done) { | |
onSuccessQueue.add(fn); | |
} | |
} | |
@Override | |
public synchronized void onFailure(Callback<Throwable> fn) { | |
if (done && error != null) { | |
fn.onCallback(error); | |
} else if (!done) { | |
onFailureQueue.add(fn); | |
} | |
} | |
@Override | |
@SuppressWarnings("unchecked") | |
public synchronized <T> Future<T> map(Function<V, T> fn) { | |
Promise<T> promise = Promise.create(); | |
if (isDone()) { | |
if (error != null) | |
promise.failure(error); | |
else | |
promise.success(fn.apply(value)); | |
} else { | |
// yeah, this needs to be unchecked... | |
mapped.add(new Pair(promise, fn)); | |
} | |
return promise; | |
} | |
@Override | |
@SuppressWarnings("unchecked") | |
public synchronized <T> Future<T> flatMap(Function<V, Future<T>> fn) { | |
Promise<T> promise = Promise.create(); | |
flatMapped.add(new Pair(promise, fn)); | |
if (isDone()) { | |
if (error != null) { | |
promise.failure(error); | |
} else | |
return fn.apply(value); | |
} | |
return promise; | |
} | |
@Override | |
public synchronized boolean cancel(boolean b) { | |
onCompletionQueue.clear(); | |
onSuccessQueue.clear(); | |
onFailureQueue.clear(); | |
for (Pair<Promise<Object>,?> pair : mapped) { | |
pair.first.cancel(true); | |
} | |
for (Pair<Promise<Object>,?> pair : flatMapped) { | |
pair.first.cancel(true); | |
} | |
mapped.clear(); | |
flatMapped.clear(); | |
if (!done) | |
canceled = true; | |
notifyAll(); | |
return !done; | |
} | |
@Override | |
public boolean isCancelled() { | |
return canceled; | |
} | |
@Override | |
public boolean isDone() { | |
return done; | |
} | |
@Override | |
public boolean isFailed() { | |
return error != null; | |
} | |
@Override | |
public synchronized V getValue() throws IllegalStateException { | |
if (!isDone() || isFailed()) | |
throw new IllegalStateException("This future is not done"); | |
if (canceled) | |
throw new IllegalStateException("This future has been canceled"); | |
return value; | |
} | |
@Override | |
public synchronized Throwable getError() { | |
if (!isDone() && !isFailed()) | |
throw new IllegalStateException("This future did not fail"); | |
return error; | |
} | |
/** | |
* Generally, get() should not be used in favor of onSuccess and onFailure | |
*/ | |
@Override | |
public V get() throws InterruptedException, ExecutionException { | |
synchronized (this) { | |
while (!done && !canceled) { | |
wait(); | |
} | |
} | |
if (error != null) throw new ExecutionException("Future failed", error); | |
if (canceled) throw new ExecutionException("Future canceled", | |
new RuntimeException()); | |
return value; | |
} | |
/** | |
* Generally, get() should not be used in favor of onSuccess and onFailure | |
*/ | |
@Override | |
public V get(long l, TimeUnit timeUnit) | |
throws InterruptedException, ExecutionException, TimeoutException { | |
synchronized (this) { | |
if (!done && !canceled) | |
wait(TimeUnit.MILLISECONDS.convert(l, timeUnit)); | |
} | |
if (error != null) throw new ExecutionException("Future failed", error); | |
if (canceled) throw new ExecutionException("Future canceled", | |
new RuntimeException()); | |
return value; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment