Skip to content

Instantly share code, notes, and snippets.

@plaflamme
Last active December 17, 2015 08:19
Show Gist options
  • Save plaflamme/5579299 to your computer and use it in GitHub Desktop.
Save plaflamme/5579299 to your computer and use it in GitHub Desktop.
class CancelableExecutorServiceFuturePool(val executor: ExecutorService) extends FuturePool {
def apply[T](f: => T): Future[T] = {
val runOk = new AtomicBoolean(true)
val p = new Promise[T]
val task = new Runnable {
val saved = Local.save()
def run() {
// Make an effort to skip work in the case the promise
// has been cancelled or already defined.
if (!runOk.compareAndSet(true, false))
return
val current = Local.save()
Local.restore(saved)
try
p.update(Try(f))
finally
Local.restore(current)
}
}
// This is safe: the only thing that can call task.run() is
// executor, the only thing that can raise an interrupt is the
// receiver of this value, which will then be fully initialized.
val javaFuture = try executor.submit(task) catch {
case e: RejectedExecutionException =>
runOk.set(false)
p.setException(e)
null
}
p.setInterruptHandler {
case cause =>
if(javaFuture != null) javaFuture.cancel(true)
if (runOk.compareAndSet(true, false)) {
val exc = new CancellationException
exc.initCause(cause)
p.setException(exc)
}
}
p
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment