Skip to content

Instantly share code, notes, and snippets.

@vitamon
Created July 29, 2014 13:50
Show Gist options
  • Save vitamon/0c508dea00b2be51d898 to your computer and use it in GitHub Desktop.
Save vitamon/0c508dea00b2be51d898 to your computer and use it in GitHub Desktop.
import akka.actor.Cancellable;
import akka.actor.Scheduler;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static akka.dispatch.Futures.future;
import static akka.dispatch.Futures.promise;
public class TimeoutFuture
{
public static <A> Future<A> apply(final Callable<A> block, FiniteDuration timeout, ExecutionContext ctx, Scheduler scheduler)
{
final Promise<A> promise = promise();
final AtomicReference<Thread> threadAtomicReference = new AtomicReference<>();
// timeout logic
final Cancellable cancellable = scheduler.scheduleOnce(timeout, new Runnable()
{
@Override
public void run()
{
// kill the thread if the future was not completed
if (promise.tryFailure(new TimeoutException()))
{
Thread thread = threadAtomicReference.getAndSet(null);
if (thread != null)
{
thread.interrupt();
}
}
}
}, ctx);
// call business logic
Future<A> f = future(new Callable<A>()
{
@Override
public A call() throws Exception
{
try
{
threadAtomicReference.set(Thread.currentThread());
return block.call();
}
finally
{
cancellable.cancel();
}
}
}, ctx);
promise.tryCompleteWith(f);
return promise.future();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment