Skip to content

Instantly share code, notes, and snippets.

@leandrob13
Last active January 12, 2018 19:17
Show Gist options
  • Save leandrob13/382df8a7b8cec321bf5d2791df240889 to your computer and use it in GitHub Desktop.
Save leandrob13/382df8a7b8cec321bf5d2791df240889 to your computer and use it in GitHub Desktop.

I have been doing some follow up to TaskLocal and I have put together some tests to illustrate some of the findings.

TaskLocal.bind looks like this:

  def bind[R](value: A)(task: Task[R]): Task[R] =
    Task.suspend {
      val saved = ref.value
      ref.update(value)
     // the cleanup occurs in the thread where the task executes.
      task.doOnFinish(_ => restore(saved))
    }

So for a use case where you pass a task that is forked or will eventually cross the async boundary, the cleanup occurs in that forked thread leaving the current thread of execution with the local var's altered state. This might not be critical or considered a bug but it is something to have in mind when using TaskLocal. A manual cleaning in your setup might be needed in case you require a pristine current local var.

The tests I made for monix test suite are the following:

  implicit val ec: Scheduler = monix.execution.Scheduler.Implicits.global
  implicit val opts = Task.defaultOptions.enableLocalContextPropagation

  testAsync("TaskLocal.bind clears local with no async boundary") {
    val local = TaskLocal(0)

    def add(i: Int) = {
      local.read.map(_ + i)
    }

    val res = for {
      _ <- local.read.map(assertEquals(_, 0))
      _ <- local.bind(1)(add(1)).map(assertEquals(_, 2))
      _ <- local.read.map(assertEquals(_, 0))
    } yield ()

    // Works without the runAsyncOpt because it is not
    // jumping to an async boundary
    // When taking into consideration local vars, always
    // use runAsyncOpt. This is just for illustration purposes
    res.runAsync

  }

  testAsync("TaskLocal.bind clears local in forked thread") {
    val local = TaskLocal(0)

    def add(i: Int) = {
      local.read.map(_ + i)
    }

    val res = for {
      _ <- local.read.map(assertEquals(_, 0))
      _ <- local.bind(1)(add(1).executeWithFork).map(assertEquals(_, 2))
      _ <- local.read.map(assertEquals(_, 0))
    } yield ()

    // Must use runAsyncOpt because of async boundary
    // Introduced by forked task.
    res.runAsyncOpt

  }

  test("TaskLocal.bind clears current local in current thread") {
    import scala.concurrent.Await
    import scala.concurrent.duration._

    val local = TaskLocal(0)

    def add(i: Int) = {
      local.read.map(_ + i)
    }

    // Basically we do the same checks as before but blocking
    // to explicitly check the status of the local in the
    // current execution thread

    assertEquals(local.read.coeval.value, Right(0))

    val res = Await.result(local.bind(1)(add(1)).runAsyncOpt, 5.seconds)

    assertEquals(res, 2)

    // clears the local as expected because the task didn't reach an async boundary.
    assertEquals(local.read.coeval.value, Right(0))

  }

  test("TaskLocal.bind does not clear current local in forked thread") {
    import scala.concurrent.Await
    import scala.concurrent.duration._

    val local = TaskLocal(0)

    def add(i: Int) = {
      local.read.map(_ + i)
    }

    //We do the current local var
    assertEquals(local.read.coeval.value, Right(0))

    //Bind clears the local in the forked thread
    //Which means it doesn't do the proper cleanup
    val res = Await.result(local.bind(1)(add(1).executeWithFork).runAsyncOpt, 5.seconds)

    assertEquals(res, 2)

    // And leaves the current local var with 1
    assertEquals(local.read.coeval.value, Right(1))
  }

There is a note that illustrated the problem that TaskLocal fixes with bind:

"A small note about garbage collection. As long as a thread exists, it will maintain references to its ThreadLocal variables. If the thread-pool does not recycle threads and a thread goes back into the pool without releasing its TLS then those objects will not be freed. Normally this isn’t an issue, however for larger objects it might be wise to explicitly release them after use, or use a WeakReference if behaviour allows."

You can find the post here

The next steps will be to do an implementation of a class that helps define a current context to define a local var, similar to finagle's ClientId example.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment