Last active March 5, 2019 15:06
import cats.effect.ExitCode
import monix.eval.{Task, TaskApp}
import monix.execution.Scheduler
import monix.execution.misc.Local
import cats.implicits._
import monix.execution.schedulers.TracingScheduler
import java.util.concurrent.{CompletableFuture, TimeUnit}
import scala.compat.java8.FutureConverters.toScala
object TaskLocalApp extends TaskApp {
override val scheduler: Scheduler = Scheduler.traced
val io: Scheduler = TracingScheduler("io"))
val globalLocal = Local[Int](0)
val asyncBoundary = Task.unit.executeAsync
val init = Task {
println(s"Starts on thread: ${Thread.currentThread.getName} local: ${globalLocal.get}")
val futureLike = CompletableFuture.runAsync(() => {
println(s"Future like on thread: ${Thread.currentThread.getName} local: ${globalLocal.get}")
val forked =
Task {
println(s"Running on thread: ${Thread.currentThread.getName} local: ${globalLocal.get}")
val onFinish =
Task(println(s"Ends on thread: ${Thread.currentThread.getName} local: ${globalLocal.get}"))
def quarantine[A](task: => Task[A]): Task[A] = {
val old = globalLocal.get { a =>
def quarantine[A](task: => Task[A], scheduler: Scheduler): Task[A] = {
val old = globalLocal.get
task.asyncBoundary(scheduler).map { a =>
def run(args: List[String]): Task[ExitCode] =
init // executes on the default scheduler
// .flatMap(_ => forked) // executed on IO and keeps the Local instance
.flatMap(_ => quarantine(Task.fromFuture(toScala(futureLike)))) // executes on ForkJoinPool and looses the Local without the call to quarantine
.doOnFinish(_ => onFinish) // executes on the default scheduler
  • with forked getting called
Starts on thread: scala-execution-context-global-12 local: 0
Running on thread: io-15 local: 1
Ends on thread: scala-execution-context-global-12 local: 2
  • with unquarantined fromFuture
Starts on thread: scala-execution-context-global-12 local: 0
Future like on thread: ForkJoinPool.commonPool-worker-1 local: 0 // this should be 1 but it's not because it looses Local
Ends on thread: ForkJoinPool.commonPool-worker-1 local: 0 // this should be 1
  • with quarantined fromFuture
Starts on thread: scala-execution-context-global-12 local: 0
Future like on thread: ForkJoinPool.commonPool-worker-1 local: 0 // it still loses the Local but it gets restored
Ends on thread: scala-execution-context-global-12 local: 1 // this is now 1

A more optimal implementation of quarantine is:

def quarantine[A](task: => Task[A], scheduler: Scheduler): Task[A] =
      for {
        old    <- Task(Local.getContext())
        result <- value.asyncBoundary(scheduler)
        _      <- Task(Local.setContext(old))
      } yield result

