Last active
September 7, 2015 07:21
-
-
Save Tolsi/69b01acbe7cd79fc93cf to your computer and use it in GitHub Desktop.
Interruptible Cancellable Scala Futures
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scala.concurrent.impl | |
import java.util.concurrent.{ExecutorService, Executors, ThreadFactory} | |
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} | |
import scala.concurrent.forkjoin.ForkJoinPool | |
import scala.util.control.NonFatal | |
import scala.util.{Failure, Success} | |
object Cancelled extends RuntimeException | |
trait Cancellable { | |
def cancel(): Unit | |
def isCancelled: Boolean | |
} | |
class CancellableExecutionContextExecutor(executorService: => ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter) extends ExecutionContextExecutor with Cancellable { | |
lazy val es = executorService | |
override def reportFailure(cause: Throwable): Unit = reporter | |
override def execute(command: Runnable): Unit = es.execute(command) | |
override def cancel(): Unit = es.shutdownNow() | |
override def isCancelled: Boolean = es.isTerminated | |
} | |
object CancellableFuture { | |
private lazy val localCtx = ExecutionContext.fromExecutor(new ForkJoinPool(1)) | |
class CancellablePromiseCompletingRunnable[T](body: => T) extends Runnable { | |
val promise = new CancellablePromise[T]() | |
override def run() = { | |
promise tryComplete { | |
try Success(body) catch { case NonFatal(e) => Failure(e) } | |
} | |
} | |
} | |
def apply[T](body: =>T)(implicit cancellableExecutionContextExecutor: CancellableExecutionContextExecutor, mayInterruptIfRunning: Boolean = true): CancellableFuture[T] = { | |
val runnable = new CancellablePromiseCompletingRunnable(body) | |
val f = cancellableExecutionContextExecutor.es.submit(runnable) | |
runnable.promise.cancelableFuture.onFailure{case Cancelled => f.cancel(mayInterruptIfRunning)}(localCtx) | |
runnable.promise.cancelableFuture | |
} | |
} | |
trait CancellableFuture[T] extends Future[T] with Cancellable | |
class CancellablePromise[T] extends Promise.DefaultPromise[T] with CancellableFuture[T] { | |
def cancelableFuture: this.type = this | |
override def cancel(): Unit = { | |
tryFailure(Cancelled) | |
} | |
override def isCancelled: Boolean = getState == Failure(Cancelled) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ru.tolsi.futures | |
import java.util.concurrent.Executors | |
import scala.concurrent.impl.CancellableExecutionContextExecutor | |
trait BlockFutures { | |
import scala.concurrent.ExecutionContext | |
def createBlockFuture(name: String, sec: Int)(implicit executor: ExecutionContext) = { | |
import scala.concurrent.Future | |
Future { | |
block(name, sec) | |
} | |
} | |
def createCancellableBlockFuture(name: String, sec: Int)(implicit cancellableExecutionContextExecutor: CancellableExecutionContextExecutor) = { | |
import scala.concurrent.impl.CancellableFuture | |
CancellableFuture { | |
block(name, sec) | |
} | |
} | |
private def block(name: String, sec: Int) = { | |
println(s"$name: start") | |
try { | |
for {i <- 1 to sec} { | |
Thread.sleep(1000) | |
println(s"$name: i'm alive $i") | |
} | |
println(s"$name finish") | |
} catch { | |
case _: InterruptedException => | |
println(s"$name stop") | |
} | |
} | |
} | |
object Futures extends App with BlockFutures { | |
implicit val ctx = new CancellableExecutionContextExecutor(Executors.newFixedThreadPool(200)) | |
val cancellableFutures = for {i <- 1 to 200} yield { | |
createCancellableBlockFuture(s"thread-$i", 100) | |
} | |
Thread.sleep(3000) | |
cancellableFutures.zipWithIndex.foreach{ | |
case (f, i) => if (i < 100) f.cancel() | |
} | |
Thread.sleep(3000) | |
ctx.cancel() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment