Skip to content

Instantly share code, notes, and snippets.

@Tolsi
Last active September 7, 2015 07:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Tolsi/69b01acbe7cd79fc93cf to your computer and use it in GitHub Desktop.
Save Tolsi/69b01acbe7cd79fc93cf to your computer and use it in GitHub Desktop.
Interruptible Cancellable Scala Futures
package scala.concurrent.impl
import java.util.concurrent.{ExecutorService, Executors, ThreadFactory}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.control.NonFatal
import scala.util.{Failure, Success}
object Cancelled extends RuntimeException
trait Cancellable {
def cancel(): Unit
def isCancelled: Boolean
}
class CancellableExecutionContextExecutor(executorService: => ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter) extends ExecutionContextExecutor with Cancellable {
lazy val es = executorService
override def reportFailure(cause: Throwable): Unit = reporter
override def execute(command: Runnable): Unit = es.execute(command)
override def cancel(): Unit = es.shutdownNow()
override def isCancelled: Boolean = es.isTerminated
}
object CancellableFuture {
private lazy val localCtx = ExecutionContext.fromExecutor(new ForkJoinPool(1))
class CancellablePromiseCompletingRunnable[T](body: => T) extends Runnable {
val promise = new CancellablePromise[T]()
override def run() = {
promise tryComplete {
try Success(body) catch { case NonFatal(e) => Failure(e) }
}
}
}
def apply[T](body: =>T)(implicit cancellableExecutionContextExecutor: CancellableExecutionContextExecutor, mayInterruptIfRunning: Boolean = true): CancellableFuture[T] = {
val runnable = new CancellablePromiseCompletingRunnable(body)
val f = cancellableExecutionContextExecutor.es.submit(runnable)
runnable.promise.cancelableFuture.onFailure{case Cancelled => f.cancel(mayInterruptIfRunning)}(localCtx)
runnable.promise.cancelableFuture
}
}
trait CancellableFuture[T] extends Future[T] with Cancellable
class CancellablePromise[T] extends Promise.DefaultPromise[T] with CancellableFuture[T] {
def cancelableFuture: this.type = this
override def cancel(): Unit = {
tryFailure(Cancelled)
}
override def isCancelled: Boolean = getState == Failure(Cancelled)
}
package ru.tolsi.futures
import java.util.concurrent.Executors
import scala.concurrent.impl.CancellableExecutionContextExecutor
trait BlockFutures {
import scala.concurrent.ExecutionContext
def createBlockFuture(name: String, sec: Int)(implicit executor: ExecutionContext) = {
import scala.concurrent.Future
Future {
block(name, sec)
}
}
def createCancellableBlockFuture(name: String, sec: Int)(implicit cancellableExecutionContextExecutor: CancellableExecutionContextExecutor) = {
import scala.concurrent.impl.CancellableFuture
CancellableFuture {
block(name, sec)
}
}
private def block(name: String, sec: Int) = {
println(s"$name: start")
try {
for {i <- 1 to sec} {
Thread.sleep(1000)
println(s"$name: i'm alive $i")
}
println(s"$name finish")
} catch {
case _: InterruptedException =>
println(s"$name stop")
}
}
}
object Futures extends App with BlockFutures {
implicit val ctx = new CancellableExecutionContextExecutor(Executors.newFixedThreadPool(200))
val cancellableFutures = for {i <- 1 to 200} yield {
createCancellableBlockFuture(s"thread-$i", 100)
}
Thread.sleep(3000)
cancellableFutures.zipWithIndex.foreach{
case (f, i) => if (i < 100) f.cancel()
}
Thread.sleep(3000)
ctx.cancel()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment