This document is about Task
, an alternative for Scalaz's Task
or Scala's Future
.
Note this is work in progress and this document suffered multiple modifications already:
The underlying design remains the same, a design inspired by Observable. The implementation currently consists of:
- Task (new)
- Scheduler (an enhanced
ExecutionContext
, capable ofscheduleOnce(delay)
, already in Monifu)
The Task
in Scalaz is a convenience wrapper around the Scalaz' Future. The implementation is quite ingenious, Future
(by means of Now
, Async
, Suspend
, Bind
) describing the stages of a trampoline. You can see this in its API.
Task.apply(f: => A)
forks a thread (yielding anAsync
state)Task.delay(f: => A)
executesf
on the current thread, trampolined (yielding aSuspend(Now(f))
state)Task.now(a)
is simply returninga
(yielding aNow
state)task.flatMap(f)
yields eitherBindSuspend
orBindAsync
, depending on what kind of futuref
produces (immediate or something that should fork another thread)Task.fork(f: => Task[A])
executesf
on another thread (yielding aBindAsync
I think, because it does a join after the initialSuspend
)
In general it's a good implementation, however I believe that the API could do better, because of these problems:
- the
Future
(and consequentlyTask
) in Scalaz is not about asynchronous execution, but about trampolined execution; the distinction is subtle, but you can see it in its API
- by default stuff gets executed on the current thread
Task.fork
can then force the execution to jump on another thread, but that's too much of a manual intervention for users: the only instance in which this is user-friendly is when you don't want stuff to block the thread from where you execute.run
, so doing aTask.fork
in that case is a no-brainer- but for asynchronous stuff you never want to block the thread on which you
.run
- Scalaz
Task
has a dual personality; even though trampolined execution implies asynchronicity, because the current thread can be used (indefinitely), the ScalazTask
has been used for getting results synchronously as well (hence it's whydef run: A
happened)
- it's been said that the root of all evil on top of the JVM are unclear boundaries between synchronous and asynchronous callbacks
- Scalaz, by exposing implementation details (e.g. the trampoline), it ends up having liveliness issues
- for asynchronous stuff it's a good idea to not block the thread where you
.run
- it's also a good idea to split synchronous loops into batches such that no thread gets blocked indefinitely
- this is especially important on top of Javascript, where concurrency is cooperative and where by blocking the run-loop you're blocking everything, including the browser's UI
- Scalaz ends up being very efficient when when running tail-recursive loops on the same thread, but when you fork threads I've seen really bad throughput compared with both my implementation and Scala's Future
The argument brought forth is that the Scalaz Task is more efficient than Scala's standard Future
, which has this habbit of executing every operation in its own thread-pool task. But Scala's standard Future
doesn't have liveliness issues on top of Javascript. And when talking performance, these abstractions are in general used for dealing with I/O, because for CPU-bound stuff the Future / Task pattern is a pretty poor choice.
Monifu's Task (currently) does not use a trampoline. It simply executs operations directly, up to a maximum stack depth. When that's exceeded, then a new task gets submitted in the thread-pool. And this is hidden from the user, as it makes a difference between implementation and API contract. FP patterns are awesome, but sometimes I feel like we are exposing too much (immutability FTW) and thus losing the ideal of the black box concept in the process.
I view Task
as a complementary to Scala's Future
, not as a replacement. See below for details. The problem we are trying to solve is that Future
represents a running computation that memoizes its result. It's perfect for its primary use-cases, but is sometimes missused and missunderstood.
- every one of Future's operators takes an implicit
ExecutionContext
(making it slightly incompatible with various type-class interfaces) - the reason is that every one of Future's operators is side-effecting, immediately sending a task in a thread-pool for execution (this actually makes sense when you're doing memoization)
- it goes without saying that
Future
does not have referential transparency
Monifu's Task
fixes these problems. Well, for the FP purists, do note that Monifu's Task
still breaks the left identity monad law. This means that this law is not true for functions f
that throw exceptions (just like Try
and Future
before it):
type f = T => Task[U]
Task(t).flatMap(f) === f(t) // not true
But for me that's unavoidable and I don't even want to attempt fixing that just to tick a checkbox, as Task
is about asynchronous execution and that means dealing with non-determinism (on the underlying Java or Javascript platforms).
To answer a question, x1
and x2
will produce the same result in both Monifu and Scalaz:
val x1 =
for {
_ <- Task { println("hi") }
_ <- Task { println("hi") }
} yield ()
val hi = Task { println("hi") }
val x2 =
for {
_ <- hi
_ <- hi
} yield ()
The effect will be different however. Upon .runAsync
Monifu will fork a single thread, whereas Scalaz will fork twice. But it gets better:
def loop(n: Int): Task[Unit] =
Task(println(n)).flatMap(_ => loop(n - 1))
// this forks a single thread in Monifu and ~100 in Scalaz
loop(100).runAsync
// this forks ~40 threads in Monifu on the JVM and ~80 tasks on JS
// (by means of setTimeout), but ~20400 logical threads in Scalaz,
// freezing the run-loop in Javascript for the duration of that loop
loop(20400).runAsync
This is where Monifu and Scalaz diverge in phylosphy. To fix the above, you'd have to use Task.delay
and then introduce Task.fork
instructions from time to time. But that's a manual intervention that requires knowledge about how Scalaz Task works and that can either smell like premature optimization or maybe it smells like a bug fix of a processes that freezes the system.
Here's another difference in the signature of .apply()
:
// Scalaz
def apply[A](a: => A)(implicit pool: ExecutorService): Task[A]
// Monifu
def apply[T](f: => T): Task[T]
As a matter of phylosophy, the difference is that in Monifu the execution of a Task
is ALWAYS asynchronous, always, meaning that we expect tasks to fork at any moment, the
user does not control the forking and so on runAsync
that's when we take our Scheduler
.
Also Monifu is using Scheduler
, an enhanced ExecutionContext
that's also supported
on Scala.js, whereas Scalaz relies on Java's ExecutorService
.
Here comes the operators. This code is valid with both, but Monifu's memory usage is more efficient (for the time being):
// Yes, this uses a lot of memory (because of that map),
// but should not trigger stack overflows.
// Also, remove the map, and you've got a tail recursion.
def sum(n: Int, acc: Long = 0): Task[Long] = {
if (n == 0) Task(acc) else
Task(n).flatMap(x => sum(x-1, acc + x).map(_ + 1))
}
We can recover from an error (similar to Future.recover
, Future.recoverWith
and same thing is possible with the Scalaz Task):
// Monifu
monifuTask.onErrorRecover {
case _: SomeException =>
"fallback value"
}
monifuTask.onErrorRecoverWith {
case _: SomeException =>
Task("fallback value")
}
We can delay execution:
// Monifu
task.delay(10.minutes)
// Scalaz
delay.delay(10.minutes)
We can interrupt a task if it takes too long to execute:
// will fail with a `TimeoutException` after 10 seconds
monifuTask.timeout(10.seconds)
// will fallback to a backup after 10 seconds of inactivity
monifuTask.timeout(10.seconds, Task("another value"))
We can zip 2 tasks:
val a = Task(1)
val b = Task(2)
val c: Task[(Int, Int)] = a.zip(b)
Contrary to Future
it doesn't execute immediately, but just like its counterpart in Scalaz, it wants to be executed before producing its result and its side-effects:
// Execution with Monifu(nothing happened until now)
// Monifu's Task always requires an execution context on run
import monifu.concurrent.Implicits.globalScheduler
monifuTask.runAsync {
case Success(value) => println(value)
case Failure(ex) => ex.printStackTrace
}
// Execution with Scalaz
scalazTask.unsafePerformAsync {
case -\/(ex) => ex.printStackTrace
case \/-(value) => println(value)
}
Blocking the current thread for the result:
// in Monifu
Await.result(monifuTask.runAsync, 10.seconds)
// in Scalaz
task.unsafePerformSyncFor(10.minutes)
Monifu's method is actually better because we're piggybacking on Await.result
from the standard library. This has the benefit that it communicates with Scala's (or soon to be Java's) ForkJoin pool, exposed by the default ExecutionContext
, to maybe add more threads in case a blocking operation occurs (God forbid). It does so by means of the blocking
block, in combination with Scala's BlockContext. And that's pretty cool.
I'm also quite fond of my integration with Scala's Future
. If you have to memoize the result in order to share it, then Future
is there and does what it's supposed to. Plus it surely beats callbacks. So Monifu's Task has the following runAsync
signature:
import monifu.concurrent.Implicits.globalScheduler
val task: Task[T] = ???
val future: Future[T] = task.runAsync
// conversion works both ways
val anotherTask = Task.fromFuture(Future("hello"))
A Task
models a producer/consumer relationship in which the producer pushes
a single value to the consumer on success, or an exception on error and then stops.
Thus we can have a consumer interface that mirrors the Observer
in Rx, like so:
abstract class Callback[-T] { self =>
def onSuccess(value: T, stackDepth: Int): Unit
def onError(ex: Throwable, stackDepth: Int): Unit
def safeOnSuccess(s: Scheduler, stackDepth: Int, value: T): Unit = ???
def safeOnError(s: Scheduler, stackDepth: Int, ex: Throwable): Unit = ???
}
This type is not meant for users, but for people implementing operators, builders, etc... The contract is this:
- on success the producer calls
onSuccess
exactly once, then stops - on error the producer calls
onError
exactly once, then stops - by means of
stackDepth
the producer is communicating the current stack depth, so upon reaching a maximum size (e.g. like Monifu hasScheduler.recommendedBatchSize
, which is 512 on the JVM or 256 on JS), you can choose to push another task in your thread-pool - so that's the purpose of
safeOnSuccess
andsafeOnError
- the contract is not meant for users, but for implementers of operators, as in people using
Task.unsafeCreate
- the API of this callback changed several times already; I don't really care about it, as long as it works, because it is meant to be encapsulated and hidden from users
So we can model our task:
sealed abstract class Task[+T] { self =>
/** Characteristic function for our `Task` */
protected def unsafeRunFn(scheduler: Scheduler, stackDepth: Int, callback: Callback[T]): Unit
// ...
// Example operator
def map[U](f: T => U): Task[U] =
Task.unsafeCreate[U] { (s, depth, cb) =>
self.stackSafeRun(s, depth, new Callback[T] {
def onError(ex: Throwable, depth: Int): Unit =
cb.safeOnError(s, depth, ex)
def onSuccess(value: T, depth: Int): Unit =
try {
val u = f(value)
cb.safeOnSuccess(s, depth, u)
} catch {
case NonFatal(ex) =>
cb.safeOnError(s, depth, ex)
}
})
}
}
The overal phylosophy is: ugly (but well encapsulated) internals, well behaved API.