This document is about Task
, an alternative for Scalaz's Task
or Scala's Future
.
Note this is work in progress and this document suffered multiple modifications already:
The underlying design remains the same, a design inspired by Observable. The implementation currently consists of:
- Task (new)
- Scheduler (an enhanced
ExecutionContext
, capable ofscheduleOnce(delay)
, already in Monix) - other utilities, such as Cancelable, CancelableFuture, MultiAssignmentCancelable, or Sincron's Atomic references
The Task
in Scalaz is a convenience wrapper around the Scalaz' Future. The implementation is quite ingenious, Future
(by means of Now
, Async
, Suspend
, Bind
) describing the stages of a trampoline. You can see this in its API.
Task.apply(f: => A)
forks a thread (yielding anAsync
state)Task.delay(f: => A)
executesf
on the current thread, trampolined (yielding aSuspend(Now(f))
state)Task.now(a)
is simply returninga
(yielding aNow
state)task.flatMap(f)
yields eitherBindSuspend
orBindAsync
, depending on what kind of futuref
produces (immediate or something that should fork another thread)Task.fork(f: => Task[A])
executesf
on another thread (yielding aBindAsync
I think, because it does a join after the initialSuspend
)
In general it's a good implementation, however I believe that the API could do better. The summary is:
- API is leaking implementation details and requires manual intervention
- has dual synchronous / asynchronous personality
- has liveliness issues, important especially for Javascript
- performance issues when forking logical threads
- unreliable for canceling running tasks
- unreliable thread handling
More detailed ...
The Future
(and consequently Task
) in Scalaz is not about asynchronous execution, but about trampolined execution; the distinction is subtle, but you can see it in its API
- by default stuff gets executed on the current thread
Task.fork
can then force the execution to jump on another thread, but that's too much of a manual intervention for users: the only instance in which this is user-friendly is when you don't want stuff to block the thread from where you execute.run
, so doing aTask.fork
in that case is a no-brainer- but for asynchronous stuff you never want to block the thread on which you
.run
Even though trampolined execution implies asynchronicity, because the current thread can be used (indefinitely), the Scalaz Task
has been used for getting results synchronously as well (hence it's why def run: A
happened). It's been said that the root of all evil on top of the JVM are unclear boundaries between synchronous and asynchronous callbacks.
- for asynchronous stuff it's a good idea to not block the thread where you
.run
- it's a good idea to split synchronous loops into batches such that no thread gets blocked indefinitely; you can do this with Scalaz by inserting
Task.fork
here and there, but it's a manual operation - this is especially important on top of Javascript, where concurrency is cooperative and where by blocking the run-loop you're blocking everything, including the browser's UI
Scalaz Task ends up being very efficient when running tail-recursive loops on the same thread, but when you fork threads I've seen really bad throughput compared with both my implementation and Scala's Future. Maybe I'm doing something wrong.
Here's the stupid benchmark I'm doing: alexandru/39c124d0b8e7c66dc5c2.
Tasks are many times for expensive stuff that can be canceled (e.g. requests to external services, disk I/O, tail-recursive loops, etc) and it would be cool to be able to cancel a running task; Scalaz's way of doing this is through unsafePerformAsyncInterruptibly(f, AtomicBoolean)
, but that's ugly and does not address executing special logic on canceling (like closing file handles, etc)
So with the Scalaz Task you end up doing:
val cancel = new AtomicBoolean(false)
task.unsafePerformAsyncInterruptibly(callback, cancel)
// ...
cancel.set(true)
This is unsafe, a big hack and only stops the trampoline, but it does not stop in-progress I/O operations, it cannot close file handles, etc. Here's Monix's current API:
val result = task.runAsync
// ...
// oops, changed my mind
result.cancel()
The implementation of Scalaz Future
and Task
is (currently) unreliable in terms of handling threading. For example Scalaz Future
contains code that blocks threads and it does so in an unsafe manner. It's unsafe because when you're blocking threads, you should always have to specify a mandatory timeout parameter. It's also unsafe (compared with Await.result
) because you can exhaust all the threads in your thread-pool, ending up with dead-locks. Here's a gem:
def unsafeStart: Future[A] = {
val latch = new java.util.concurrent.CountDownLatch(1)
@volatile var result: Option[A] = None
unsafePerformAsync { a => result = Some(a); latch.countDown }
delay { latch.await; result.get }
}
I view Task
as a complementary to Scala's Future
, not as a replacement. The problem we are trying to solve is that Future
represents a running computation that memoizes its result. It's perfect for its primary use-cases, but is sometimes missused and missunderstood.
- every one of Future's operators takes an implicit
ExecutionContext
(making it slightly incompatible with various type-class interfaces) - the reason is that every one of Future's operators is side-effecting, immediately sending a task in a thread-pool for execution (this actually makes sense when you're doing memoization)
- it goes without saying that
Future
does not have referential transparency
Monix's Task
fixes these problems. Well, for the FP purists, do note that Monix's Task
still breaks the left identity monad law. This means that this law is not true for functions f
that throw exceptions (just like Try
and Future
before it):
type f = T => Task[U]
Task(t).flatMap(f) === f(t) // not true
But for me that's unavoidable and I don't even want to attempt fixing that just to tick a checkbox, as Task
is about asynchronous execution and that means dealing with non-determinism (on the underlying Java or Javascript platforms).
To answer a question, x1
and x2
will produce the same result in both Monix and Scalaz:
val x1 =
for {
_ <- Task { println("hi") }
_ <- Task { println("hi") }
} yield ()
val hi = Task { println("hi") }
val x2 =
for {
_ <- hi
_ <- hi
} yield ()
The effect will be different however. Upon .runAsync
Monix will fork a single thread, whereas Scalaz will fork twice. But it gets better:
def loop(n: Int): Task[Unit] =
Task(println(n)).flatMap(_ => loop(n - 1))
// this forks a single thread in Monix and ~100 in Scalaz
loop(100).runAsync
// this forks ~40 threads in Monix on the JVM and ~80 tasks on JS
// (by means of setTimeout), but ~20400 logical threads in Scalaz,
// freezing the run-loop in Javascript for the duration of that loop
loop(20400).runAsync
This is where Monix and Scalaz diverge in phylosphy. To fix the above, you'd have to use Task.delay
and then introduce Task.fork
instructions from time to time. But that's a manual intervention that requires knowledge about how Scalaz Task works and that can either smell like premature optimization or maybe it smells like a bug fix of a processes that freezes the system.
Here's another difference in the signature of .apply()
:
// Scalaz
def apply[A](a: => A)(implicit pool: ExecutorService): Task[A]
// Monix
def apply[T](f: => T): Task[T]
As a matter of philosophy, the difference is that in Monix the execution of a Task
is ALWAYS asynchronous, always, meaning that we expect tasks to fork at any moment, the
user does not control the forking and so on runAsync
that's when we take our Scheduler
.
Also Monix is using Scheduler
, an enhanced ExecutionContext
that's also supported
on Scala.js, whereas Scalaz relies on Java's ExecutorService
.
Here comes the operators. This code is valid with both, but Monix's memory usage is more efficient (for the time being):
// Yes, this uses a lot of memory (because of that map),
// but should not trigger stack overflows.
// Also, remove the map, and you've got a tail recursion.
def sum(n: Int, acc: Long = 0): Task[Long] = {
if (n == 0) Task(acc) else
Task(n).flatMap(x => sum(x-1, acc + x).map(_ + 1))
}
We can recover from an error (similar to Future.recover
, Future.recoverWith
and same thing is possible with the Scalaz Task):
// Monix
monixTask.onErrorRecover {
case _: SomeException =>
"fallback value"
}
monixTask.onErrorRecoverWith {
case _: SomeException =>
Task("fallback value")
}
We can delay execution:
// Monix
task.delay(10.minutes)
// Scalaz
delay.delay(10.minutes)
We can interrupt a task if it takes too long to execute:
// will fail with a `TimeoutException` after 10 seconds
monixTask.timeout(10.seconds)
// will fallback to a backup after 10 seconds of inactivity
monixTask.timeout(10.seconds, Task("another value"))
We can zip 2 tasks:
val a = Task(1)
val b = Task(2)
val c: Task[(Int, Int)] = a.zip(b)
Contrary to Future
it doesn't execute immediately, but just like its counterpart in Scalaz, it wants to be executed before producing its result and its side-effects:
// Execution with Monix(nothing happened until now)
// Monix's Task always requires an execution context on run
import monix.execution.Scheduler.Implicits.global
monixTask.runAsync {
case Success(value) => println(value)
case Failure(ex) => ex.printStackTrace
}
// Execution with Scalaz
scalazTask.unsafePerformAsync {
case -\/(ex) => ex.printStackTrace
case \/-(value) => println(value)
}
Blocking the current thread for the result:
// in Monix
Await.result(monixTask.runAsync, 10.seconds)
// in Scalaz
task.unsafePerformSyncFor(10.minutes)
Monix's method is actually better because we're piggybacking on Await.result
from the standard library. This has the benefit that it communicates with Scala's (or soon to be Java's) ForkJoin pool, exposed by the default ExecutionContext
, to maybe add more threads in case a blocking operation occurs (God forbid). It does so by means of the blocking
block, in combination with Scala's BlockContext. And that's pretty cool.
I'm also quite fond of my integration with Scala's Future
. If you have to memoize the result in order to share it, then Future
is there and does what it's supposed to. Plus it surely beats callbacks. So Monix's Task has the following runAsync
signature:
import monix.execution.Scheduler.Implicits.global
val task: Task[T] = ???
val future: Future[T] = task.runAsync
// conversion works both ways
val anotherTask = Task.fromFuture(Future("hello"))
And with Monix's Task
canceling running computations is a first-class concept:
val result = task.runAsync
// ... later ...
result.cancel()
A Task
models a producer/consumer relationship in which the producer pushes
a single value to the consumer on success, or an exception on error and then stops.
Thus we can have a consumer interface that mirrors the Observer
in Rx, like so:
abstract class Callback[-T] { self =>
def onSuccess(value: T, stackDepth: Int): Unit
def onError(ex: Throwable, stackDepth: Int): Unit
def safeOnSuccess(s: Scheduler, stackDepth: Int, value: T): Unit = ???
def safeOnError(s: Scheduler, stackDepth: Int, ex: Throwable): Unit = ???
}
This type is not meant for users, they'll never see it actually. The contract is this:
- on success the producer calls
onSuccess
exactly once, then stops - on error the producer calls
onError
exactly once, then stops - by means of
stackDepth
the producer is communicating the current stack depth, so upon reaching a maximum size (e.g. like Monix hasScheduler.recommendedBatchSize
, which is 512 on the JVM or 256 on JS), you can choose to push another task in your thread-pool - so that's the purpose of
safeOnSuccess
andsafeOnError
- the API of this callback changed several times already; I don't really care about it, as long as it works, because it is meant to be encapsulated, internal, hidden from users
So we can model our task:
sealed abstract class Task[+T] { self =>
/** Characteristic function for our [[Task]]. Never use this directly. */
protected def unsafeRunFn(
scheduler: Scheduler,
cancelable: MultiAssignmentCancelable,
stackDepth: Int,
callback: Callback[T]): Unit
// ...
// Example operator
def map[U](f: T => U): Task[U] =
Task.unsafeCreate[U] { (scheduler, cancelable, depth, cb) =>
self.stackSafeRun(scheduler, cancelable, depth, new Callback[T] {
def onError(ex: Throwable, depth: Int): Unit =
cb.safeOnError(scheduler, depth, ex)
def onSuccess(value: T, depth: Int): Unit =
try {
val u = f(value)
cb.safeOnSuccess(scheduler, depth, u)
} catch {
case NonFatal(ex) =>
cb.safeOnError(scheduler, depth, ex)
}
})
}
}
object Task {
/** Creates a task, similar to Scalaz' Task.async */
def create[T](signal: (Try[T] => Unit) => Cancelable): Task[T] =
Task.unsafeCreate { (scheduler, cancelable, depth, cb) =>
try {
cancelable := signal {
case Success(value) =>
cb.safeOnSuccess(scheduler, depth, value)
case Failure(ex) =>
cb.safeOnError(scheduler, depth, ex)
}
} catch {
case NonFatal(ex) =>
cb.safeOnError(scheduler, depth, ex)
}
}
/** Internal */
private def unsafeCreate[T](f: (Scheduler, MultiAssignmentCancelable, Int, Callback[T]) => Unit): Task[T] =
new Task[T] {
def unsafeRunFn(s: Scheduler, c: MultiAssignmentCancelable, d: Int, cb: Callback[T]): Unit =
f(s, c, d, cb)
}
}
The overall philosophy is: ugly (but well encapsulated) internals, well behaved API.
Could you put together a side-by-side comparison between the design you're proposing here and Scalaz's
Task
? Something short, just bullet points with a few code snippets to illustrate usage.