Skip to content

Instantly share code, notes, and snippets.

@mihaisoloi
Last active March 5, 2019 15:06
Show Gist options
  • Save mihaisoloi/04f9ddbb16292fa3b6b3769d2744244e to your computer and use it in GitHub Desktop.
Save mihaisoloi/04f9ddbb16292fa3b6b3769d2744244e to your computer and use it in GitHub Desktop.
import cats.effect.ExitCode
import monix.eval.{Task, TaskApp}
import monix.execution.Scheduler
import monix.execution.misc.Local
import cats.implicits._
import monix.execution.schedulers.TracingScheduler
import java.util.concurrent.{CompletableFuture, TimeUnit}
import scala.compat.java8.FutureConverters.toScala
object TaskLocalApp extends TaskApp {
override val scheduler: Scheduler = Scheduler.traced
val io: Scheduler = TracingScheduler(Scheduler.io("io"))
val globalLocal = Local[Int](0)
val asyncBoundary = Task.unit.executeAsync
val init = Task {
println(s"Starts on thread: ${Thread.currentThread.getName} local: ${globalLocal.get}")
globalLocal.update(1)
}.executeWithOptions(_.enableLocalContextPropagation)
val futureLike = CompletableFuture.runAsync(() => {
TimeUnit.SECONDS.sleep(1)
println(s"Future like on thread: ${Thread.currentThread.getName} local: ${globalLocal.get}")
})
val forked =
Task {
println(s"Running on thread: ${Thread.currentThread.getName} local: ${globalLocal.get}")
globalLocal.update(2)
}.executeOn(io)
val onFinish =
Task(println(s"Ends on thread: ${Thread.currentThread.getName} local: ${globalLocal.get}"))
def quarantine[A](task: => Task[A]): Task[A] = {
val old = globalLocal.get
task.asyncBoundary.map { a =>
globalLocal.update(old)
a
}
}
def quarantine[A](task: => Task[A], scheduler: Scheduler): Task[A] = {
val old = globalLocal.get
task.asyncBoundary(scheduler).map { a =>
globalLocal.update(old)
a
}
}
def run(args: List[String]): Task[ExitCode] =
init // executes on the default scheduler
// .flatMap(_ => forked) // executed on IO and keeps the Local instance
.flatMap(_ => quarantine(Task.fromFuture(toScala(futureLike)))) // executes on ForkJoinPool and looses the Local without the call to quarantine
.doOnFinish(_ => onFinish) // executes on the default scheduler
.as(ExitCode.Success)
}
@mihaisoloi
Copy link
Author

Output:

  • with forked getting called
Starts on thread: scala-execution-context-global-12 local: 0
Running on thread: io-15 local: 1
Ends on thread: scala-execution-context-global-12 local: 2
  • with unquarantined fromFuture
Starts on thread: scala-execution-context-global-12 local: 0
Future like on thread: ForkJoinPool.commonPool-worker-1 local: 0 // this should be 1 but it's not because it looses Local
Ends on thread: ForkJoinPool.commonPool-worker-1 local: 0 // this should be 1
  • with quarantined fromFuture
Starts on thread: scala-execution-context-global-12 local: 0
Future like on thread: ForkJoinPool.commonPool-worker-1 local: 0 // it still loses the Local but it gets restored
Ends on thread: scala-execution-context-global-12 local: 1 // this is now 1

@mihaisoloi
Copy link
Author

A more optimal implementation of quarantine is:

def quarantine[A](task: => Task[A], scheduler: Scheduler): Task[A] =
      for {
        old    <- Task(Local.getContext())
        result <- value.asyncBoundary(scheduler)
        _      <- Task(Local.setContext(old))
      } yield result

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment