Skip to content

Instantly share code, notes, and snippets.

@alexandru
Last active April 1, 2018 14:57
Show Gist options
  • Save alexandru/55a6038c2fe61025d555 to your computer and use it in GitHub Desktop.
Save alexandru/55a6038c2fe61025d555 to your computer and use it in GitHub Desktop.
Task: A diverging design from Future and Scalaz Task

Monix Task: A diverging design

This document is about Task, an alternative for Scalaz's Task or Scala's Future. Note this is work in progress and this document suffered multiple modifications already:

The underlying design remains the same, a design inspired by Observable. The implementation currently consists of:

So what's wrong with Scalaz Task?

The Task in Scalaz is a convenience wrapper around the Scalaz' Future. The implementation is quite ingenious, Future (by means of Now, Async, Suspend, Bind) describing the stages of a trampoline. You can see this in its API.

  • Task.apply(f: => A) forks a thread (yielding an Async state)
  • Task.delay(f: => A) executes f on the current thread, trampolined (yielding a Suspend(Now(f)) state)
  • Task.now(a) is simply returning a (yielding a Now state)
  • task.flatMap(f) yields either BindSuspend or BindAsync, depending on what kind of future f produces (immediate or something that should fork another thread)
  • Task.fork(f: => Task[A]) executes f on another thread (yielding a BindAsync I think, because it does a join after the initial Suspend)

In general it's a good implementation, however I believe that the API could do better. The summary is:

  1. API is leaking implementation details and requires manual intervention
  2. has dual synchronous / asynchronous personality
  3. has liveliness issues, important especially for Javascript
  4. performance issues when forking logical threads
  5. unreliable for canceling running tasks
  6. unreliable thread handling

More detailed ...

1) API is leaking implementation details and requires manual intervention

The Future (and consequently Task) in Scalaz is not about asynchronous execution, but about trampolined execution; the distinction is subtle, but you can see it in its API

  • by default stuff gets executed on the current thread
  • Task.fork can then force the execution to jump on another thread, but that's too much of a manual intervention for users: the only instance in which this is user-friendly is when you don't want stuff to block the thread from where you execute .run, so doing a Task.fork in that case is a no-brainer
  • but for asynchronous stuff you never want to block the thread on which you .run

2) Has dual synchronous / asynchronous personality

Even though trampolined execution implies asynchronicity, because the current thread can be used (indefinitely), the Scalaz Task has been used for getting results synchronously as well (hence it's why def run: A happened). It's been said that the root of all evil on top of the JVM are unclear boundaries between synchronous and asynchronous callbacks.

3) Has liveliness issues, important especially for Javascript

  • for asynchronous stuff it's a good idea to not block the thread where you .run
  • it's a good idea to split synchronous loops into batches such that no thread gets blocked indefinitely; you can do this with Scalaz by inserting Task.fork here and there, but it's a manual operation
  • this is especially important on top of Javascript, where concurrency is cooperative and where by blocking the run-loop you're blocking everything, including the browser's UI

4) Performance issues when forking logical threads

Scalaz Task ends up being very efficient when running tail-recursive loops on the same thread, but when you fork threads I've seen really bad throughput compared with both my implementation and Scala's Future. Maybe I'm doing something wrong.

Here's the stupid benchmark I'm doing: alexandru/39c124d0b8e7c66dc5c2.

5) Unreliable for canceling running tasks

Tasks are many times for expensive stuff that can be canceled (e.g. requests to external services, disk I/O, tail-recursive loops, etc) and it would be cool to be able to cancel a running task; Scalaz's way of doing this is through unsafePerformAsyncInterruptibly(f, AtomicBoolean), but that's ugly and does not address executing special logic on canceling (like closing file handles, etc)

So with the Scalaz Task you end up doing:

val cancel = new AtomicBoolean(false)
task.unsafePerformAsyncInterruptibly(callback, cancel)
// ...
cancel.set(true)

This is unsafe, a big hack and only stops the trampoline, but it does not stop in-progress I/O operations, it cannot close file handles, etc. Here's Monix's current API:

val result = task.runAsync
// ...
// oops, changed my mind
result.cancel()

6) Unreliable thread handling

The implementation of Scalaz Future and Task is (currently) unreliable in terms of handling threading. For example Scalaz Future contains code that blocks threads and it does so in an unsafe manner. It's unsafe because when you're blocking threads, you should always have to specify a mandatory timeout parameter. It's also unsafe (compared with Await.result) because you can exhaust all the threads in your thread-pool, ending up with dead-locks. Here's a gem:

  def unsafeStart: Future[A] = {
    val latch = new java.util.concurrent.CountDownLatch(1)
    @volatile var result: Option[A] = None
    unsafePerformAsync { a => result = Some(a); latch.countDown }
    delay { latch.await; result.get }
  }

Versus Scala's Future

I view Task as a complementary to Scala's Future, not as a replacement. The problem we are trying to solve is that Future represents a running computation that memoizes its result. It's perfect for its primary use-cases, but is sometimes missused and missunderstood.

  • every one of Future's operators takes an implicit ExecutionContext (making it slightly incompatible with various type-class interfaces)
  • the reason is that every one of Future's operators is side-effecting, immediately sending a task in a thread-pool for execution (this actually makes sense when you're doing memoization)
  • it goes without saying that Future does not have referential transparency

Monix's Task fixes these problems. Well, for the FP purists, do note that Monix's Task still breaks the left identity monad law. This means that this law is not true for functions f that throw exceptions (just like Try and Future before it):

type f = T => Task[U]
Task(t).flatMap(f) === f(t) // not true

But for me that's unavoidable and I don't even want to attempt fixing that just to tick a checkbox, as Task is about asynchronous execution and that means dealing with non-determinism (on the underlying Java or Javascript platforms).

API & Usage

To answer a question, x1 and x2 will produce the same result in both Monix and Scalaz:

val x1 =
  for {
    _ <- Task { println("hi") }
    _ <- Task { println("hi") }
  } yield ()
  
val hi = Task { println("hi") }
val x2 =
  for {
   _ <- hi
   _ <- hi
  } yield ()  

The effect will be different however. Upon .runAsync Monix will fork a single thread, whereas Scalaz will fork twice. But it gets better:

def loop(n: Int): Task[Unit] = 
  Task(println(n)).flatMap(_ => loop(n - 1))
  
// this forks a single thread in Monix and ~100 in Scalaz
loop(100).runAsync

// this forks ~40 threads in Monix on the JVM and ~80 tasks on JS 
// (by means of setTimeout), but ~20400 logical threads in Scalaz,
// freezing the run-loop in Javascript for the duration of that loop
loop(20400).runAsync

This is where Monix and Scalaz diverge in phylosphy. To fix the above, you'd have to use Task.delay and then introduce Task.fork instructions from time to time. But that's a manual intervention that requires knowledge about how Scalaz Task works and that can either smell like premature optimization or maybe it smells like a bug fix of a processes that freezes the system.

Here's another difference in the signature of .apply():

// Scalaz
def apply[A](a: => A)(implicit pool: ExecutorService): Task[A]

// Monix
def apply[T](f: => T): Task[T]

As a matter of philosophy, the difference is that in Monix the execution of a Task is ALWAYS asynchronous, always, meaning that we expect tasks to fork at any moment, the user does not control the forking and so on runAsyncthat's when we take our Scheduler. Also Monix is using Scheduler, an enhanced ExecutionContextthat's also supported on Scala.js, whereas Scalaz relies on Java's ExecutorService.

Here comes the operators. This code is valid with both, but Monix's memory usage is more efficient (for the time being):

// Yes, this uses a lot of memory (because of that map),
// but should not trigger stack overflows.
// Also, remove the map, and you've got a tail recursion.

def sum(n: Int, acc: Long = 0): Task[Long] = {
  if (n == 0) Task(acc) else
    Task(n).flatMap(x => sum(x-1, acc + x).map(_ + 1))
}

We can recover from an error (similar to Future.recover, Future.recoverWith and same thing is possible with the Scalaz Task):

// Monix
monixTask.onErrorRecover {
  case _: SomeException => 
    "fallback value"
}

monixTask.onErrorRecoverWith {
  case _: SomeException => 
    Task("fallback value")
}

We can delay execution:

// Monix
task.delay(10.minutes)

// Scalaz
delay.delay(10.minutes)

We can interrupt a task if it takes too long to execute:

// will fail with a `TimeoutException` after 10 seconds
monixTask.timeout(10.seconds)

// will fallback to a backup after 10 seconds of inactivity
monixTask.timeout(10.seconds, Task("another value"))

We can zip 2 tasks:

val a = Task(1)
val b = Task(2)

val c: Task[(Int, Int)] = a.zip(b)

Contrary to Future it doesn't execute immediately, but just like its counterpart in Scalaz, it wants to be executed before producing its result and its side-effects:

// Execution with Monix(nothing happened until now)
// Monix's Task always requires an execution context on run
import monix.execution.Scheduler.Implicits.global
monixTask.runAsync {
  case Success(value) => println(value)
  case Failure(ex) => ex.printStackTrace
}

// Execution with Scalaz
scalazTask.unsafePerformAsync {
  case -\/(ex) => ex.printStackTrace
  case \/-(value) => println(value)
}

Blocking the current thread for the result:

// in Monix
Await.result(monixTask.runAsync, 10.seconds)

// in Scalaz
task.unsafePerformSyncFor(10.minutes)

Monix's method is actually better because we're piggybacking on Await.result from the standard library. This has the benefit that it communicates with Scala's (or soon to be Java's) ForkJoin pool, exposed by the default ExecutionContext, to maybe add more threads in case a blocking operation occurs (God forbid). It does so by means of the blocking block, in combination with Scala's BlockContext. And that's pretty cool.

I'm also quite fond of my integration with Scala's Future. If you have to memoize the result in order to share it, then Future is there and does what it's supposed to. Plus it surely beats callbacks. So Monix's Task has the following runAsync signature:

import monix.execution.Scheduler.Implicits.global

val task: Task[T] = ???
val future: Future[T] = task.runAsync

// conversion works both ways
val anotherTask = Task.fromFuture(Future("hello"))

And with Monix's Task canceling running computations is a first-class concept:

val result = task.runAsync
// ... later ...
result.cancel()

Implementation details

A Task models a producer/consumer relationship in which the producer pushes a single value to the consumer on success, or an exception on error and then stops. Thus we can have a consumer interface that mirrors the Observer in Rx, like so:

abstract class Callback[-T] { self =>
  def onSuccess(value: T, stackDepth: Int): Unit
  def onError(ex: Throwable, stackDepth: Int): Unit
  
  def safeOnSuccess(s: Scheduler, stackDepth: Int, value: T): Unit = ???
  def safeOnError(s: Scheduler, stackDepth: Int, ex: Throwable): Unit = ???
}

This type is not meant for users, they'll never see it actually. The contract is this:

  • on success the producer calls onSuccess exactly once, then stops
  • on error the producer calls onError exactly once, then stops
  • by means of stackDepth the producer is communicating the current stack depth, so upon reaching a maximum size (e.g. like Monix has Scheduler.recommendedBatchSize, which is 512 on the JVM or 256 on JS), you can choose to push another task in your thread-pool
  • so that's the purpose of safeOnSuccess and safeOnError
  • the API of this callback changed several times already; I don't really care about it, as long as it works, because it is meant to be encapsulated, internal, hidden from users

So we can model our task:

sealed abstract class Task[+T] { self =>
  /** Characteristic function for our [[Task]]. Never use this directly. */
  protected def unsafeRunFn(
    scheduler: Scheduler,
    cancelable: MultiAssignmentCancelable,
    stackDepth: Int,
    callback: Callback[T]): Unit
    
  // ...
  // Example operator
  def map[U](f: T => U): Task[U] =
    Task.unsafeCreate[U] { (scheduler, cancelable, depth, cb) =>
      self.stackSafeRun(scheduler, cancelable, depth, new Callback[T] {
        def onError(ex: Throwable, depth: Int): Unit =
          cb.safeOnError(scheduler, depth, ex)

        def onSuccess(value: T, depth: Int): Unit =
          try {
            val u = f(value)
            cb.safeOnSuccess(scheduler, depth, u)
          } catch {
            case NonFatal(ex) =>
              cb.safeOnError(scheduler, depth, ex)
          }
      })
    }
}

object Task {
  /** Creates a task, similar to Scalaz' Task.async */
  def create[T](signal: (Try[T] => Unit) => Cancelable): Task[T] =
    Task.unsafeCreate { (scheduler, cancelable, depth, cb) =>
      try {
        cancelable := signal {
          case Success(value) =>
            cb.safeOnSuccess(scheduler, depth, value)
          case Failure(ex) =>
            cb.safeOnError(scheduler, depth, ex)
        }
      } catch {
        case NonFatal(ex) =>
          cb.safeOnError(scheduler, depth, ex)
      }
    }

  /** Internal */
  private def unsafeCreate[T](f: (Scheduler, MultiAssignmentCancelable, Int, Callback[T]) => Unit): Task[T] =
    new Task[T] {
      def unsafeRunFn(s: Scheduler, c: MultiAssignmentCancelable, d: Int, cb: Callback[T]): Unit =
        f(s, c, d, cb)
    }
}

The overall philosophy is: ugly (but well encapsulated) internals, well behaved API.

@milessabin
Copy link

Could you put together a side-by-side comparison between the design you're proposing here and Scalaz's Task? Something short, just bullet points with a few code snippets to illustrate usage.

@alexandru
Copy link
Author

Yes, I will. I'm currently fleshing out the implementation, since this didn't exist yesterday morning and after that I'll provide a comparison :-)

@alexandru
Copy link
Author

Updated :-)

@tpolecat
Copy link

Thanks for this. Very interesting.

Can you clarify whether these programs the same or not?

val x =
  for {
    _ <- Task { println("hi") }
    _ <- Task { println("hi") }
  } yield ()

and

val hi = Task { println("hi") }
val x =
  for {
  _ <- hi
  _ <- hi
} yield ()

@adelbertc
Copy link

Depending on how the final implementation looks, perhaps you may also want to see how suitable it may be to be in Cats itself? :-)

That being said, we do have a WIP branch here - I believe the progress stopped at making it stack safe, with tests showing it StackOverflowError-ing.

@alexandru
Copy link
Author

@tpolecat yes, it behaves like you'd expect, the two programs are the same.

@adelbertc sure; btw, this implementation is stack safe

@ngbinh
Copy link

ngbinh commented Dec 23, 2015

looks great! 👍

Copy link

ghost commented Dec 23, 2015

@tpolecat re question from cats gitter: "is there a better place to discuss this?"

@adelbertc re WIP branch

Both points were brought up by @non in https://github.com/non/cats/issues/32#issuecomment-141079117, but that was a while ago. Specifically:

I think using this issue to brainstorm, ask questions, and help try to flesh out the requirements of a task in Cats would definitely be helpful.

and

Thus, feature/task (in its current state) is not eligible to be merged.

Just as a suggestion, perhaps the old issue has got stale and rather than try and reboot it, it might be worth launching a V2 issue?

@alexandru
Copy link
Author

👍 @inthenow.

I have updated the document. It now contains a list of what this implementation does better than the Task in Scalaz, as I think @milessabin wanted. See the "What's better than Scalaz?" section.

Copy link

ghost commented Dec 23, 2015

Cool. Before I/we do this, there are two issues regarding moving the current Future instances from Cats to Alleycats - non/alleycats#26 and https://github.com/non/cats/issues/589.

Given this - would Alleycats be the better place to open the new issue and possible first implementation?

@alexandru
Copy link
Author

Changed again (Dec 30).

@puffnfresh
Copy link

scalaz.concurrent.Task is not very good. Using just a reasonable IO type, it should be possible to subsume Task's use cases with some uses of MVar-like atomic-references. For example:

scalaz/scalaz@series/7.3.x...puffnfresh:feature/concurrent-io

I highly recommend focusing on doing a good job at I/O and then using that to talk about concurrency, rather than trying to do the opposite.

@gpampara
Copy link

@puffnfresh: 👍

@aloiscochard
Copy link

Hi all,

You might be interested to look at my matterhorn experiment: https://github.com/aloiscochard/matterhorn

It is basically a specialized interpreter for IO actions: https://github.com/aloiscochard/matterhorn/blob/master/core/src/main/scala/rts/Interpreter.scala

There is some tests and benchmarks showing how it work:
https://github.com/aloiscochard/matterhorn/blob/master/core/src/test/scala/CoreSpec.scala
https://github.com/aloiscochard/matterhorn/blob/master/bench/src/main/scala/pure.scala

The approach to concurrency is to use a STM, I just did plug a library for having rapid prototyping.

Good luck

@alexandru
Copy link
Author

@aloiscochard, @puffnfresh thanks, will take a look.

@pchiusano
Copy link

scalaz.concurrent.Task is not very good. Using just a reasonable IO type, it should be possible to subsume Task's use cases with some uses of MVar-like atomic-references. For example:

I agree, and that is the route we are going with FS2's Task type (which I suppose could be renamed to 'IO'), which implements MVar-like references and then uses that to implement async (and lots of other operations, like parallelTraverse, as well as all the concurrent stream operations of FS2 itself): https://github.com/functional-streams-for-scala/fs2/blob/topic/redesign/core/src/main/scala/fs2/Async.scala#L75

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment