Skip to content

Instantly share code, notes, and snippets.

@xuwei-k
Forked from alexandru/task-proposal.md
Created December 30, 2015 14:36
Show Gist options
  • Save xuwei-k/897f69a79284e5ae2f50 to your computer and use it in GitHub Desktop.
Save xuwei-k/897f69a79284e5ae2f50 to your computer and use it in GitHub Desktop.
Task: A diverging design from Future and Scalaz Task

Monifu Task: A diverging design from Future and Scalaz Task

This document is about Task, an alternative for Scalaz's Task or Scala's Future. Note this is work in progress and this document suffered multiple modifications already:

The underlying design remains the same, a design inspired by Observable. The implementation currently consists of:

  • Task (new)
  • Scheduler (an enhanced ExecutionContext, capable of scheduleOnce(delay), already in Monifu)

So what's wrong with Scalaz Task?

The Task in Scalaz is a convenience wrapper around the Scalaz' Future. The implementation is quite ingenious, Future (by means of Now, Async, Suspend, Bind) describing the stages of a trampoline. You can see this in its API.

  • Task.apply(f: => A) forks a thread (yielding an Async state)
  • Task.delay(f: => A) executes f on the current thread, trampolined (yielding a Suspend(Now(f)) state)
  • Task.now(a) is simply returning a (yielding a Now state)
  • task.flatMap(f) yields either BindSuspend or BindAsync, depending on what kind of future f produces (immediate or something that should fork another thread)
  • Task.fork(f: => Task[A]) executes f on another thread (yielding a BindAsync I think, because it does a join after the initial Suspend)

In general it's a good implementation, however I believe that the API could do better, because of these problems:

  1. the Future (and consequently Task) in Scalaz is not about asynchronous execution, but about trampolined execution; the distinction is subtle, but you can see it in its API
  • by default stuff gets executed on the current thread
  • Task.fork can then force the execution to jump on another thread, but that's too much of a manual intervention for users: the only instance in which this is user-friendly is when you don't want stuff to block the thread from where you execute .run, so doing a Task.fork in that case is a no-brainer
  • but for asynchronous stuff you never want to block the thread on which you .run
  1. Scalaz Task has a dual personality; even though trampolined execution implies asynchronicity, because the current thread can be used (indefinitely), the Scalaz Task has been used for getting results synchronously as well (hence it's why def run: A happened)
  1. Scalaz, by exposing implementation details (e.g. the trampoline), it ends up having liveliness issues
  • for asynchronous stuff it's a good idea to not block the thread where you .run
  • it's also a good idea to split synchronous loops into batches such that no thread gets blocked indefinitely
  • this is especially important on top of Javascript, where concurrency is cooperative and where by blocking the run-loop you're blocking everything, including the browser's UI
  1. Scalaz ends up being very efficient when when running tail-recursive loops on the same thread, but when you fork threads I've seen really bad throughput compared with both my implementation and Scala's Future

The argument brought forth is that the Scalaz Task is more efficient than Scala's standard Future, which has this habbit of executing every operation in its own thread-pool task. But Scala's standard Future doesn't have liveliness issues on top of Javascript. And when talking performance, these abstractions are in general used for dealing with I/O, because for CPU-bound stuff the Future / Task pattern is a pretty poor choice.

Monifu's Task (currently) does not use a trampoline. It simply executs operations directly, up to a maximum stack depth. When that's exceeded, then a new task gets submitted in the thread-pool. And this is hidden from the user, as it makes a difference between implementation and API contract. FP patterns are awesome, but sometimes I feel like we are exposing too much (immutability FTW) and thus losing the ideal of the black box concept in the process.

Versus Scala's Future

I view Task as a complementary to Scala's Future, not as a replacement. See below for details. The problem we are trying to solve is that Future represents a running computation that memoizes its result. It's perfect for its primary use-cases, but is sometimes missused and missunderstood.

  • every one of Future's operators takes an implicit ExecutionContext (making it slightly incompatible with various type-class interfaces)
  • the reason is that every one of Future's operators is side-effecting, immediately sending a task in a thread-pool for execution (this actually makes sense when you're doing memoization)
  • it goes without saying that Future does not have referential transparency

Monifu's Task fixes these problems. Well, for the FP purists, do note that Monifu's Task still breaks the left identity monad law. This means that this law is not true for functions f that throw exceptions (just like Try and Future before it):

type f = T => Task[U]
Task(t).flatMap(f) === f(t) // not true

But for me that's unavoidable and I don't even want to attempt fixing that just to tick a checkbox, as Task is about asynchronous execution and that means dealing with non-determinism (on the underlying Java or Javascript platforms).

API & Usage

To answer a question, x1 and x2 will produce the same result in both Monifu and Scalaz:

val x1 =
  for {
    _ <- Task { println("hi") }
    _ <- Task { println("hi") }
  } yield ()
  
val hi = Task { println("hi") }
val x2 =
  for {
   _ <- hi
   _ <- hi
  } yield ()  

The effect will be different however. Upon .runAsync Monifu will fork a single thread, whereas Scalaz will fork twice. But it gets better:

def loop(n: Int): Task[Unit] = 
  Task(println(n)).flatMap(_ => loop(n - 1))
  
// this forks a single thread in Monifu and ~100 in Scalaz
loop(100).runAsync

// this forks ~40 threads in Monifu on the JVM and ~80 tasks on JS 
// (by means of setTimeout), but ~20400 logical threads in Scalaz,
// freezing the run-loop in Javascript for the duration of that loop
loop(20400).runAsync

This is where Monifu and Scalaz diverge in phylosphy. To fix the above, you'd have to use Task.delay and then introduce Task.fork instructions from time to time. But that's a manual intervention that requires knowledge about how Scalaz Task works and that can either smell like premature optimization or maybe it smells like a bug fix of a processes that freezes the system.

Here's another difference in the signature of .apply():

// Scalaz
def apply[A](a: => A)(implicit pool: ExecutorService): Task[A]

// Monifu
def apply[T](f: => T): Task[T]

As a matter of phylosophy, the difference is that in Monifu the execution of a Task is ALWAYS asynchronous, always, meaning that we expect tasks to fork at any moment, the user does not control the forking and so on runAsyncthat's when we take our Scheduler. Also Monifu is using Scheduler, an enhanced ExecutionContextthat's also supported on Scala.js, whereas Scalaz relies on Java's ExecutorService.

Here comes the operators. This code is valid with both, but Monifu's memory usage is more efficient (for the time being):

// Yes, this uses a lot of memory (because of that map),
// but should not trigger stack overflows.
// Also, remove the map, and you've got a tail recursion.

def sum(n: Int, acc: Long = 0): Task[Long] = {
  if (n == 0) Task(acc) else
    Task(n).flatMap(x => sum(x-1, acc + x).map(_ + 1))
}

We can recover from an error (similar to Future.recover, Future.recoverWith and same thing is possible with the Scalaz Task):

// Monifu
monifuTask.onErrorRecover {
  case _: SomeException => 
    "fallback value"
}

monifuTask.onErrorRecoverWith {
  case _: SomeException => 
    Task("fallback value")
}

We can delay execution:

// Monifu
task.delay(10.minutes)

// Scalaz
delay.delay(10.minutes)

We can interrupt a task if it takes too long to execute:

// will fail with a `TimeoutException` after 10 seconds
monifuTask.timeout(10.seconds)

// will fallback to a backup after 10 seconds of inactivity
monifuTask.timeout(10.seconds, Task("another value"))

We can zip 2 tasks:

val a = Task(1)
val b = Task(2)

val c: Task[(Int, Int)] = a.zip(b)

Contrary to Future it doesn't execute immediately, but just like its counterpart in Scalaz, it wants to be executed before producing its result and its side-effects:

// Execution with Monifu(nothing happened until now)
// Monifu's Task always requires an execution context on run
import monifu.concurrent.Implicits.globalScheduler
monifuTask.runAsync {
  case Success(value) => println(value)
  case Failure(ex) => ex.printStackTrace
}

// Execution with Scalaz
scalazTask.unsafePerformAsync {
  case -\/(ex) => ex.printStackTrace
  case \/-(value) => println(value)
}

Blocking the current thread for the result:

// in Monifu
Await.result(monifuTask.runAsync, 10.seconds)

// in Scalaz
task.unsafePerformSyncFor(10.minutes)

Monifu's method is actually better because we're piggybacking on Await.result from the standard library. This has the benefit that it communicates with Scala's (or soon to be Java's) ForkJoin pool, exposed by the default ExecutionContext, to maybe add more threads in case a blocking operation occurs (God forbid). It does so by means of the blocking block, in combination with Scala's BlockContext. And that's pretty cool.

I'm also quite fond of my integration with Scala's Future. If you have to memoize the result in order to share it, then Future is there and does what it's supposed to. Plus it surely beats callbacks. So Monifu's Task has the following runAsync signature:

import monifu.concurrent.Implicits.globalScheduler

val task: Task[T] = ???
val future: Future[T] = task.runAsync

// conversion works both ways
val anotherTask = Task.fromFuture(Future("hello"))

Implementation details

A Task models a producer/consumer relationship in which the producer pushes a single value to the consumer on success, or an exception on error and then stops. Thus we can have a consumer interface that mirrors the Observer in Rx, like so:

  abstract class Callback[-T] { self =>
    def onSuccess(value: T, stackDepth: Int): Unit
    def onError(ex: Throwable, stackDepth: Int): Unit
    
    def safeOnSuccess(s: Scheduler, stackDepth: Int, value: T): Unit = ???
    def safeOnError(s: Scheduler, stackDepth: Int, ex: Throwable): Unit = ???
  }

This type is not meant for users, but for people implementing operators, builders, etc... The contract is this:

  • on success the producer calls onSuccess exactly once, then stops
  • on error the producer calls onError exactly once, then stops
  • by means of stackDepth the producer is communicating the current stack depth, so upon reaching a maximum size (e.g. like Monifu has Scheduler.recommendedBatchSize, which is 512 on the JVM or 256 on JS), you can choose to push another task in your thread-pool
  • so that's the purpose of safeOnSuccess and safeOnError
  • the contract is not meant for users, but for implementers of operators, as in people using Task.unsafeCreate
  • the API of this callback changed several times already; I don't really care about it, as long as it works, because it is meant to be encapsulated and hidden from users

So we can model our task:

sealed abstract class Task[+T] { self =>
  /** Characteristic function for our `Task` */
  protected def unsafeRunFn(scheduler: Scheduler, stackDepth: Int, callback: Callback[T]): Unit
  
  // ...
  // Example operator
  def map[U](f: T => U): Task[U] =
    Task.unsafeCreate[U] { (s, depth, cb) =>
      self.stackSafeRun(s, depth, new Callback[T] {
        def onError(ex: Throwable, depth: Int): Unit =
          cb.safeOnError(s, depth, ex)

        def onSuccess(value: T, depth: Int): Unit =
          try {
            val u = f(value)
            cb.safeOnSuccess(s, depth, u)
          } catch {
            case NonFatal(ex) =>
              cb.safeOnError(s, depth, ex)
          }
      })
    }  
}

The overal phylosophy is: ugly (but well encapsulated) internals, well behaved API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment