Skip to content

Instantly share code, notes, and snippets.

@olix0r
Created September 8, 2016 02:17
Show Gist options
  • Save olix0r/ce41fa31816fbf21cf6b8e5c3c59bf19 to your computer and use it in GitHub Desktop.
Save olix0r/ce41fa31816fbf21cf6b8e5c3c59bf19 to your computer and use it in GitHub Desktop.
Future cancellation by example
import com.twitter.conversions.time._
import com.twitter.finagle.util.DefaultTimer
import com.twitter.util._
import java.util.concurrent.atomic.AtomicInteger
import scala.util.control.NoStackTrace
object Cancellator {
class Cancelled(n: Int) extends NoStackTrace
val pool = FuturePool.unboundedPool
def run(): Unit = {
val resultFs = (0 until 10000).map { i =>
val p = new Promise[String]
p.setInterruptHandler {
case e: Throwable =>
// Cancelation doesn't cause the value to be satisfied, so we try to set it here:
val _ = p.updateIfEmpty(Throw(e))
}
// Try to cancel the exception and satisfy it at the same time:
val ready = new AtomicInteger(0)
pool {
ready.incrementAndGet()
while (ready.get != 2) {}
p.raise(new Cancelled(i))
}
pool {
ready.incrementAndGet()
while (ready.get != 2) {}
val _ = p.updateIfEmpty(Return("ok!"))
}
p.liftToTry
}
val countsF = Future.collect(resultFs).map { results =>
val errors = results.count(_.isThrow)
(results.size - errors, errors)
}
val (ok, errors) = Await.result(countsF)
println(s"results: $ok ok, $errors errors")
}
}
scala> (0 to 10).foreach(_ => Cancellator.run())
results: 9598 ok, 402 errors
results: 9572 ok, 428 errors
results: 9603 ok, 397 errors
results: 9393 ok, 607 errors
results: 9481 ok, 519 errors
results: 9564 ok, 436 errors
results: 9270 ok, 730 errors
results: 9437 ok, 563 errors
results: 9549 ok, 451 errors
results: 9328 ok, 672 errors
results: 9210 ok, 790 errors
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment