This proposal is about Task
, an alternative for Scala's Future
, Scalaz's Task
or C#'s Task
.
Note this is work in progress. You can track the current progress on:
monifu/tree/task. The implementation currently consists of these classes:
- Task (new)
- Trampoline (new)
- Scheduler (an enhanced
ExecutionContext
, capable ofscheduleOnce(delay)
, already in Monifu) - EvictingQueue (an array-backed circular FIFO queue, used by our trampoline, already in Monifu)
- atomic references (already in Monifu)
I must say that I was impressed with the design of the Scalaz Task. The good parts of the Scalaz Task
that my Task also does:
- I originally ignored stack overflows, but the authors of Scalaz Task did not; fret not, this Task implementation also protects against stack overflows
- in comparisson with Scala's standard
Future
, aTask
only describes an asynchronous computation, however nothing is executed and no side-effects triggered untilTask.runAsync
- the Scalaz
Task
executes its operators on a trampoline by default, making it very efficient and in case we want to paralelize, that choice can be explicit; this is in stark contrast with Scala'sFuture
and actually in line with Monifu'sObservable
, making me very happy :-) ... I copied the idea, though not the implementation
What my Task does better:
- clean API, dirty and rather elegant and well encapsulated implementation: the internal design of
Task
follows that of Monifu'sObservable
and I must emphasise on better encapsulation - personally I feel there's no reason to expose the underlyingTrampoline
mechanics, because who knows, in the future I might not want to use aTrampoline
- the
Future
in Scalaz is described as a "trampolined computation"; myTask
by comparisson is just an "asynchronous computation", trampolining is not guaranteed, as it would be an implementation leak - the single-threaded performance of Monifu matches that of Scalaz, but when threads are forked, for some reason my implementation has more than twice the throughput, matching and even exceeding that of Scala's standard
Future
- in the short and flawed benchmarks I've made, my implementation has better RAM usage characteristics, which makes sense because of the dirty implementation
- the
- Monifu's
Task
is always pushing tasks in the thread-pool (described by the givenScheduler
) onrunAsync
- these are the user facing methods and thus we are ensuring that the result is always synchronous and that we are not blocking the current thread- on the other hand the operators themselves (e.g.
map
,flatMap
) are using an internalTrampoline
, so we do get the performance improvements
- on the other hand the operators themselves (e.g.
- Monifu's
Task
always requires aScheduler
on execution (because it's not the user who gets to decide whether aScheduler
/ thread-pool is needed)- as a consequence, Monifu's
Task
does not take an implicit thread-pool on builders such asTask.apply
, or a scheduler on utilities such asTask#delay
: Monifu'sTask
has clean builders that do not leak implementation details ;-)
- as a consequence, Monifu's
- Monifu has its own
Scheduler
(inherits from Scala'sExecutionContext
, equivalent for aScheduledExecutorService
), so we can work on Javascript and are not piggy-backing on Java's standard library- this implementation already runs on Scala.js and runs well
- Monifu's
Task
has integration with Scala'sFuture
, having arunAsync
that returns aFuture
, as for what it does (e.g. memoization, callbacks management) is the best choice, plus it's standard (see below)
Performance of the new Monifu Task is very close to that of Scalaz Task when using the trampoline (slightly worse, but OK and more improvements are possible). But when forking logical threads (e.g. Task.apply
/ Task.fork
), the performance of Monifu's Task is much better having more than twice the throughput (on my machine, with a stupid benchmark) - one reason would be that the default thread-pool in the Scalaz Task is worse than Monifu's default ForkJoin pool, but that doesn't fully explain the difference, there's something really wrong there.
To answer a question, x1
and x2
will produce the same result and the same side-effects in both Monifu and Scalaz:
val x1 =
for {
_ <- Task { println("hi") }
_ <- Task { println("hi") }
} yield ()
val hi = Task { println("hi") }
val x2 =
for {
_ <- hi
_ <- hi
} yield ()
Creating a task (that is supposed to be executed on another logical thread) is similar:
// creating task for execution on a separate logical thread
val task = Task {
println("Executing ...")
"Hello, world!"
}
But there is a different here, in signature:
// Scalaz
def apply[A](a: => A)(implicit pool: ExecutorService): Task[A]
// Monifu
def apply[T](f: => T): Task[T]
As a matter of phylosophy, the difference is that in Monifu the execution of a Task
is ALWAYS asynchronous, always, meaning that we expect tasks to fork at any moment (plus
the trampoline is not unbounded and once we hit a limit, we fork) and so on runAsync
that's when we take our Scheduler
. Also Monifu is using Scheduler
,
an enhanced ExecutionContext
that's also supported on Scala.js, whereas
Scalaz relies on Java's ExecutorService
.
We can apply various operators on it. This code is valid with both:
// Yes, this uses a lot of memory (because of that map),
// but should not trigger stack overflows.
// Also, remove the map, and you've got a tail recursion.
def sum(n: Int, acc: Long = 0): Task[Long] = {
if (n == 0) Task(acc) else
Task(n).flatMap(x => sum(x-1, acc + x).map(_ + 1))
}
We can recover from an error (similar to Future.recover
, Future.recoverWith
and same thing is possible with the Scalaz Task):
// Monifu
monifuTask.onErrorRecover {
case _: SomeException =>
"fallback value"
}
monifuTask.onErrorRecoverWith {
case _: SomeException =>
Task("fallback value")
}
We can delay execution:
// Monifu
task.delay(10.minutes)
// Scalaz
delay.delay(10.minutes)
We can interrupt a task if it takes too long to execute:
// will fail with a `TimeoutException` after 10 seconds
monifuTask.timeout(10.seconds)
// will fallback to a backup after 10 seconds of inactivity
monifuTask.timeout(10.seconds, Task("another value"))
We can zip 2 tasks:
val a = Task(1)
val b = Task(2)
val c: Task[(Int, Int)] = a.zip(b)
We've got some utilities inspired by Scala's Future
:
val taskOfList: Task[Seq[T]] = Task.sequence(listOfTasks)
val task: Task[T] = Task.firstCompletedOf(listOfTasks)
Contrary to Future
it doesn't execute immediately, but just like its counterpart in Scalaz, it wants to be executed first:
// Execution with Monifu(nothing happened until now)
// Monifu's Task always requires an execution context on run
import monifu.concurrent.Implicits.globalScheduler
monifuTask.runAsync {
case Success(value) => println(value)
case Failure(ex) => ex.printStackTrace
}
// Execution with Scalaz
scalazTask.unsafePerformAsync {
case -\/(ex) => ex.printStackTrace
case \/-(value) => println(value)
}
Blocking the current thread for the result:
// in Monifu
Await.result(monifuTask.runAsync, 10.seconds)
// in Scalaz
task.unsafePerformSyncFor(10.minutes)
Monifu's method is actually better because we're piggybacking on Await.result
from the standard library. This has the benefit that it communicates with Scala's (or soon to be Java's) ForkJoin pool, exposed by the default ExecutionContext
, to maybe add more threads in case a blocking operation occurs (God forbid). It does so by means of the blocking
block, in combination with Scala's BlockContext. And that's pretty cool.
I'm also quite fond of my integration with Scala's Future
. If you have to memoize the result in order to share it, then Future
is there and does what it's supposed to. Plus it surely beats callbacks. So Monifu's Task has the following runAsync
signature:
import monifu.concurrent.Implicits.globalScheduler
val task: Task[T] = ???
val future: Future[T] = task.runAsync
// conversion works both ways
val anotherTask = Task.fromFuture(Future("hello"))
A Task
models a producer/consumer relationship in which the producer pushes
a single value to the consumer on success, or an exception on error and then stops.
Thus we can have a consumer interface that mirrors the Observer
in Rx, like so:
abstract class Callback[-T] { self =>
def onSuccess(value: T): Unit
def onError(ex: Throwable): Unit
// might execute onSuccess on a trampoline, or not :-)
def asyncOnSuccess(s: Scheduler, value: T, fork: Boolean = false): Unit = ???
// might execute onError on a trampoline, or not :-)
def asyncOnError(s: Scheduler, ex: Throwable, fork: Boolean = false): Unit = ???
}
This type is not meant for users, but for people implementing operators or other utilities. The contract is this:
- on success the producer calls
onSuccess
exactly once, then stops - on error the producer calls
onError
exactly once, then stops - in operators, we should never call
onSuccess
oronError
directly, as that can trigger stack overflows, but rather we should use theTrampoline
or fork a new thread - the contract is not meant for users, but for implementers of operators, or people using
Task.unsafeCreate
So we can model our task:
/**
* For modeling asynchronous computations.
*/
trait Task[+T] { self =>
/**
* Characteristic function for our [[Task]].
* Everything gets implemented based on this function.
*/
protected def unsafeRunFn(scheduler: Scheduler, callback: Callback[T]): Unit
// SAMPLE OPERATOR
def map[U](f: T => U): Task[U] =
Task.unsafeCreate[U] { (s,cb) =>
self.unsafeRunFn(s, new Callback[T] {
def onError(ex: Throwable): Unit =
cb.asyncOnError(s, ex)
def onSuccess(value: T): Unit =
try {
val u = f(value)
cb.asyncOnSuccess(s, u)
} catch {
case NonFatal(ex) =>
cb.asyncOnError(s, ex)
}
})
}
// ...
}
object Task {
/** Returns a new task that, when executed, will emit the
* result of the given function executed asynchronously.
*/
def apply[T](f: => T): Task[T] =
Task.unsafeCreate { (scheduler, callback) =>
scheduler.execute(
new Runnable {
def run(): Unit = {
// protecting against user code
try callback.onSuccess(f) catch {
case NonFatal(ex) =>
callback.onError(ex)
}
}
})
}
/** Builder for [[Task]] instances. For usage on implementing
* operators or builders. Only use if you know what you're doing.
*/
def unsafeCreate[T](f: (Scheduler, Callback[T]) => Unit): Task[T] =
new Task[T] {
override def unsafeRunFn(s: Scheduler, cb: Callback[T]): Unit =
f(s,cb)
}
}
I'm actually hiding the implementation details of the trampoline (under the method calls asyncOnSuccess
and asyncOnError
). Because they don't matter.
Could you put together a side-by-side comparison between the design you're proposing here and Scalaz's
Task
? Something short, just bullet points with a few code snippets to illustrate usage.