I have been doing some follow up to TaskLocal
and I have put together some tests to illustrate some of the findings.
TaskLocal.bind
looks like this:
def bind[R](value: A)(task: Task[R]): Task[R] =
Task.suspend {
val saved = ref.value
ref.update(value)
// the cleanup occurs in the thread where the task executes.
task.doOnFinish(_ => restore(saved))
}
So for a use case where you pass a task that is forked or will eventually cross the async boundary, the cleanup occurs in that forked thread leaving the current thread of execution with the local var's altered state. This might not be critical or considered a bug but it is something to have in mind when using TaskLocal. A manual cleaning in your setup might be needed in case you require a pristine current local var.
The tests I made for monix test suite are the following:
implicit val ec: Scheduler = monix.execution.Scheduler.Implicits.global
implicit val opts = Task.defaultOptions.enableLocalContextPropagation
testAsync("TaskLocal.bind clears local with no async boundary") {
val local = TaskLocal(0)
def add(i: Int) = {
local.read.map(_ + i)
}
val res = for {
_ <- local.read.map(assertEquals(_, 0))
_ <- local.bind(1)(add(1)).map(assertEquals(_, 2))
_ <- local.read.map(assertEquals(_, 0))
} yield ()
// Works without the runAsyncOpt because it is not
// jumping to an async boundary
// When taking into consideration local vars, always
// use runAsyncOpt. This is just for illustration purposes
res.runAsync
}
testAsync("TaskLocal.bind clears local in forked thread") {
val local = TaskLocal(0)
def add(i: Int) = {
local.read.map(_ + i)
}
val res = for {
_ <- local.read.map(assertEquals(_, 0))
_ <- local.bind(1)(add(1).executeWithFork).map(assertEquals(_, 2))
_ <- local.read.map(assertEquals(_, 0))
} yield ()
// Must use runAsyncOpt because of async boundary
// Introduced by forked task.
res.runAsyncOpt
}
test("TaskLocal.bind clears current local in current thread") {
import scala.concurrent.Await
import scala.concurrent.duration._
val local = TaskLocal(0)
def add(i: Int) = {
local.read.map(_ + i)
}
// Basically we do the same checks as before but blocking
// to explicitly check the status of the local in the
// current execution thread
assertEquals(local.read.coeval.value, Right(0))
val res = Await.result(local.bind(1)(add(1)).runAsyncOpt, 5.seconds)
assertEquals(res, 2)
// clears the local as expected because the task didn't reach an async boundary.
assertEquals(local.read.coeval.value, Right(0))
}
test("TaskLocal.bind does not clear current local in forked thread") {
import scala.concurrent.Await
import scala.concurrent.duration._
val local = TaskLocal(0)
def add(i: Int) = {
local.read.map(_ + i)
}
//We do the current local var
assertEquals(local.read.coeval.value, Right(0))
//Bind clears the local in the forked thread
//Which means it doesn't do the proper cleanup
val res = Await.result(local.bind(1)(add(1).executeWithFork).runAsyncOpt, 5.seconds)
assertEquals(res, 2)
// And leaves the current local var with 1
assertEquals(local.read.coeval.value, Right(1))
}
There is a note that illustrated the problem that TaskLocal fixes with bind:
"A small note about garbage collection. As long as a thread exists, it will maintain references to its ThreadLocal variables. If the thread-pool does not recycle threads and a thread goes back into the pool without releasing its TLS then those objects will not be freed. Normally this isn’t an issue, however for larger objects it might be wise to explicitly release them after use, or use a WeakReference if behaviour allows."
You can find the post here
The next steps will be to do an implementation of a class that helps define a current context to define a local var, similar to finagle's ClientId example.