Skip to content

Instantly share code, notes, and snippets.

@jhalterman
Last active January 30, 2022 12:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jhalterman/3c6221f8fefd4fa97ce4623a614fad6b to your computer and use it in GitHub Desktop.
Save jhalterman/3c6221f8fefd4fa97ce4623a614fad6b to your computer and use it in GitHub Desktop.
A cancellable Scala Future
package net.jodah.cancellablefuture
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}
object CancellableFuture {
def apply[T](body: => T)(implicit executor: ExecutionContext): CancellableFuture[T] = new CancellableFuture(body)
}
/**
* A future that supports cancellation and interruption, even when backed by a ForkJoinPool.
*/
class CancellableFuture[T](body: => T)(implicit executor: ExecutionContext) extends Future[T] {
private val promise = Promise[T]()
private var thread: Thread = null
private val cancelled = new AtomicBoolean()
promise tryCompleteWith Future {
if (!promise.isCompleted) {
this.synchronized {
thread = Thread.currentThread
}
try {
body
} finally {
this.synchronized {
// Clears the interrupt flag
Thread.interrupted()
thread = null
}
}
} else
null.asInstanceOf[T]
}
/**
* Attempts to cancel the future. Cancellation succeeds if the future is not yet complete or cancelled. A cancelled
* future will have a result of Failure(CancellationException).
*
* @param interrupt Whether to interrupt the running Future
* @return True if the execution was cancelled
*/
def cancel(interrupt: Boolean): Boolean = {
if (!promise.isCompleted && cancelled.compareAndSet(false, true)) {
promise.tryComplete(Failure(new CancellationException()))
if (interrupt) {
this.synchronized {
if (thread != null)
thread.interrupt()
}
}
true
} else
false
}
/**
* Returns whether the future was cancelled.
*/
def isCancelled: Boolean = cancelled.get()
override def onComplete[U](f: Try[T] => U)(implicit executor: ExecutionContext): Unit = promise.future.onComplete(f)
override def isCompleted: Boolean = promise.future.isCompleted
override def value: Option[Try[T]] = promise.future.value
@throws(classOf[TimeoutException])
@throws(classOf[InterruptedException])
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = CancellableFuture[T](Await.result(promise.future, atMost)).asInstanceOf
@throws(classOf[Exception])
def result(atMost: Duration)(implicit permit: CanAwait): T = promise.future.result(atMost)
}
package net.jodah.cancellablefuture
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean
import org.scalatest.concurrent.{ScalaFutures, Waiters}
import org.scalatest.{FunSpec, Matchers}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{CancellationException, ExecutionContext}
class CancellableFutureSpec extends FunSpec with Matchers with Waiters with ScalaFutures {
describe("CancelleableFuture") {
it("should complete with result") {
val f = CancellableFuture("test")
f.futureValue shouldBe "test"
f.isCompleted shouldBe true
f.isCancelled shouldBe false
}
it("should be cancellable") {
val f = CancellableFuture(() -> {
Thread.sleep(1000)
})
f.cancel(false) shouldBe true
f.failed.futureValue.isInstanceOf[CancellationException]
f.isCompleted shouldBe true
f.isCancelled shouldBe true
}
it("should immediately cancel pending task") {
val ec = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
// Fully utilize the ExecutionContext
val w = new Waiter()
CancellableFuture(() -> {
w.await()
})(ec)
var executed = new AtomicBoolean()
val f = CancellableFuture(() -> {
executed.set(true)
})(ec)
f.cancel(false) shouldBe true
w.dismiss()
Thread.sleep(500)
executed.get() shouldBe false
f.failed.futureValue.isInstanceOf[CancellationException]
f.isCompleted shouldBe true
f.isCancelled shouldBe true
}
it("should not support repeated cancellation") {
val f = CancellableFuture(() -> {
Thread.sleep(1000)
})
f.cancel(false) shouldBe true
f.cancel(false) shouldBe false
}
it("should be interruptable") {
val waiter = new Waiter()
val f = CancellableFuture(() -> {
try {
Thread.sleep(1000)
} catch {
case _: InterruptedException => waiter.dismiss()
}
})
Thread.sleep(100)
f.cancel(true) shouldBe true
f.failed.futureValue.isInstanceOf[CancellationException]
waiter.await()
}
}
}
@jhalterman
Copy link
Author

Currently, cancel returns true if the task is already running. The body will still continue to run, but the Future will be completed with CancellationException.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment