Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Task: A diverging design from Future and Scalaz Task

Task Proposal (for Monifu)

This proposal is about Task, an alternative for Scala's Future, Scalaz's Task or C#'s Task. We're using the following classes already defined in Monifu:

A Task models a producer/consumer relationship in which the producer pushes a single value to the consumer on success, or an exception on error and then stops. Thus we can have a consumer interface that mirrors the Observer in Rx, like so:

trait Callback[-T] {
  def scheduler: Scheduler
  def onSuccess(value: T): Unit
  def onError(ex: Throwable): Unit
}

The contract is this:

  • on success the producer calls onSuccess exactly once, then stops
  • on error the producer calls onError exactly once, then stops
  • in case producing the value is very cheap (value is already known), then execution of onSuccess is allowed to happen on the current thread (see Task.successfull below)
  • in case producing the value is expensive (if it's possible to block the current thread or to end up in a recursive loop), then execution should always be asynchronous and should preferably happen on the given Scheduler
  • the contract is not meant for users, but for implementers that use Task.unsafeCreate

So we can model our task like this, and note the full interoperability with Scala's Future and compare with the Scalaz Task:

/**
 * For modeling asynchronous computations.
 */
trait Task[+T] { self =>
  /**
   * Characteristic function for our [[Task]].
   */
  def unsafeRun(c: Callback[T]): Cancelable

  /**
   * Triggers asynchronous execution.
   */
  def unsafeRun(f: Try[T] => Unit)(implicit s: Scheduler): Cancelable =
    unsafeRun(new Callback[T] {
      val scheduler = s
      def onError(ex: Throwable): Unit = f(Failure(ex))
      def onSuccess(value: T): Unit = f(Success(value))
    })
  
  /**
   * Returns a new Task that applies the mapping function to
   * the element emitted by the source.
   */
  def map[U](f: T => U): Task[U] =
    Task.unsafeCreate[U] { callback =>
      self.unsafeRun(new Callback[T] {
        val scheduler = callback.scheduler

        def onSuccess(value: T): Unit = {
          var streamError = true
          try {
            val u = f(value)
            streamError = false
            callback.onSuccess(u)
          } catch {
            case NonFatal(ex) if streamError =>
              onError(ex)
          }
        }

        def onError(ex: Throwable): Unit =
          callback.onError(ex)
      })
    }

  /**
   * Given a source Task that emits another Task, this function
   * flattens the result, returning a Task equivalent to the
   * emitted Task by the source.
   */
  def flatten[U](implicit ev: T <:< Task[U]): Task[U] =
    Task.unsafeCreate { callbackU =>
      val cancelable = MultiAssignmentCancelable()
      cancelable := self.unsafeRun(new Callback[T] {
        val scheduler = callbackU.scheduler
        def onSuccess(value: T): Unit =
          cancelable := value.unsafeRun(callbackU)
        def onError(ex: Throwable): Unit =
          callbackU.onError(ex)
      })

      cancelable
    }

  /**
   * Creates a new Task by applying a function to the successful
   * result of the source Task, and returns a task equivalent to
   * the result of the function.
   *
   * Calling `flatMap` is literally the equivalent of:
   * {{{
   *   task.map(f).flatten
   * }}}
   */
  def flatMap[U](f: T => Task[U]): Task[U] =
    map(f).flatten

  /**
   * Returns a task that on execution returns the result of the source
   * task but delayed in time by the given `timestamp`.
   */
  def delay(timespan: FiniteDuration): Task[T] =
    Task.unsafeCreate[T] { callback =>
      val cancelable = MultiAssignmentCancelable()
      cancelable := callback.scheduler.scheduleOnce(timespan,
        new Runnable {
          override def run(): Unit = {
            cancelable := self.unsafeRun(callback)
          }
        })

      cancelable
    }

  /**
   * Returns a failed projection of this task.
   *
   * The failed projection is a future holding a value of type `Throwable`,
   * emitting a value which is the throwable of the original task in
   * case the original task fails, otherwise if the source succeeds, then
   * it fails with a `NoSuchElementException`.
   */
  def failed: Task[Throwable] =
    Task.unsafeCreate { callback =>
      self.unsafeRun(new Callback[T] {
        val scheduler = callback.scheduler

        def onError(ex: Throwable): Unit =
          callback.onSuccess(ex)

        def onSuccess(value: T): Unit =
          callback.onError(new NoSuchElementException("Task.failed"))
      })
    }

  /**
   * Creates a new task that will handle any matching throwable
   * that this task might emit.
   */
  def onErrorRecover[U >: T](pf: PartialFunction[Throwable, U]): Task[U] =
    Task.unsafeCreate { callbackU =>
      self.unsafeRun(new Callback[T] {
        val scheduler = callbackU.scheduler

        def onError(ex: Throwable): Unit = {
          var streamError = false
          try {
            if (pf.isDefinedAt(ex)) {
              val u = pf(ex)
              streamError = true
              callbackU.onSuccess(u)
            } else {
              callbackU.onError(ex)
            }
          } catch {
            case NonFatal(err) if streamError =>
              callbackU.scheduler.reportFailure(ex)
              callbackU.onError(err)
          }
        }

        def onSuccess(value: T): Unit =
          callbackU.onSuccess(value)
      })
    }

  /**
   * Creates a new task that will handle any matching throwable that this
   * task might emit by executing another task.
   */
  def onErrorRecoverWith[U >: T](pf: PartialFunction[Throwable, Task[U]]): Task[U] = {
    Task.unsafeCreate { callbackU =>
      val cancelable = MultiAssignmentCancelable()

      cancelable := self.unsafeRun(new Callback[T] {
        val scheduler = callbackU.scheduler

        def onError(ex: Throwable): Unit = {
          var streamError = true
          try {
            if (pf.isDefinedAt(ex)) {
              val newTask = pf(ex)
              streamError = false
              cancelable := newTask.unsafeRun(callbackU)
            } else {
              callbackU.onError(ex)
            }
          } catch {
            case NonFatal(err) if streamError =>
              callbackU.scheduler.reportFailure(ex)
              callbackU.onError(err)
          }
        }

        def onSuccess(value: T): Unit =
          callbackU.onSuccess(value)
      })

      cancelable
    }
  }

  /**
   * Returns a Task that mirrors the source Task but that triggers a
   * `TimeoutException` in case the given duration passes without the
   * task emitting any item.
   */
  def timeout(after: FiniteDuration): Task[T] =
    Task.unsafeCreate { callback =>
      val c = CompositeCancelable()

      c += callback.scheduler.scheduleOnce(after,
        new Runnable {
          def run(): Unit = {
            if (c.cancel())
              callback.onError(new TimeoutException(
                s"Task timed-out after $after of inactivity"))
          }
        })

      c += self.unsafeRun(new Callback[T] {
        val scheduler = callback.scheduler
        def onError(ex: Throwable): Unit =
          if (c.cancel()) callback.onError(ex)
        def onSuccess(value: T): Unit =
          if (c.cancel()) callback.onSuccess(value)
      })

      Cancelable(c.cancel())
    }

  /**
   * Returns a Task that mirrors the source Task but switches to
   * the given backup Task in case the given duration passes without the
   * source emitting any item.
   */
  def timeout[U >: T](after: FiniteDuration, backup: Task[U]): Task[U] =
    Task.unsafeCreate { callback =>
      val isActive = CompositeCancelable()
      val cancelable = MultiAssignmentCancelable(isActive)

      isActive += callback.scheduler.scheduleOnce(after,
        new Runnable {
          def run(): Unit = {
            if (isActive.cancel())
              cancelable := backup.unsafeRun(callback)
          }
        })

      isActive += self.unsafeRun(new Callback[T] {
        val scheduler = callback.scheduler
        def onError(ex: Throwable): Unit =
          if (isActive.cancel()) callback.onError(ex)
        def onSuccess(value: T): Unit =
          if (isActive.cancel()) callback.onSuccess(value)
      })

      cancelable
    }

  /**
   * Zips the values of `this` and `that` task, and creates a new task that
   * will emit the tuple of their results.
   */
  def zip[U](that: Task[U]): Task[(T, U)] =
    Task.unsafeCreate { callbackTU =>
      val c = CompositeCancelable()
      val state = Atomic(null : Either[T, U])

      c += self.unsafeRun(
        new Callback[T] {
          val scheduler = callbackTU.scheduler
          def onError(ex: Throwable): Unit = {
            if (c.cancel())
              callbackTU.onError(ex)
          }

          @tailrec
          def onSuccess(value: T): Unit =
            state.get match {
              case null =>
                if (!state.compareAndSet(null, Left(value)))
                  onSuccess(value)
              case Right(u) =>
                callbackTU.onSuccess((value, u))
              case Left(_) =>
                ()
            }
        })


      c += that.unsafeRun(
        new Callback[U] {
          val scheduler = callbackTU.scheduler
          def onError(ex: Throwable): Unit = {
            if (c.cancel())
              callbackTU.onError(ex)
          }

          @tailrec
          def onSuccess(value: U): Unit =
            state.get match {
              case null =>
                if (!state.compareAndSet(null, Right(value)))
                  onSuccess(value)
              case Left(l) =>
                callbackTU.onSuccess((l, value))
              case Right(_) =>
                ()
            }
        })

      Cancelable(c.cancel())
    }

  /**
   * Converts this task into a Scala `Future`, triggering
   * its execution.
   */
  def asFuture(implicit s: Scheduler): Future[T] = {
    val p = Promise[T]()
    self.unsafeRun(new Callback[T] {
      val scheduler = s
      def onError(ex: Throwable): Unit =
        p.tryFailure(ex)
      def onSuccess(value: T): Unit =
        p.trySuccess(value)
    })

    p.future
  }
}

object Task {
  /**
   * Returns a new task that, when executed, will emit the
   * result of the given function executed asynchronously.
   */
  def apply[T](f: => T): Task[T] =
    Task.unsafeCreate { callback =>
      val cancelable = BooleanCancelable()
      callback.scheduler.execute(
        new Runnable {
          override def run(): Unit =
            if (!cancelable.isCanceled) {
              try callback.onSuccess(f) catch {
                case NonFatal(ex) =>
                  callback.onError(ex)
              }
            }
        })

      cancelable
    }

  /**
   * Builder for [[Task]] instances. Only use if you know what
   * you're doing.
   */
  def unsafeCreate[T](f: Callback[T] => Cancelable): Task[T] =
    new Task[T] {
      def unsafeRun(c: Callback[T]): Cancelable = f(c)
    }

  /**
   * Returns a task that on execution is always successful,
   * emitting the given element.
   */
  def successful[T](elem: T): Task[T] =
    Task.unsafeCreate { callback =>
      try callback.onSuccess(elem) catch {
        case NonFatal(ex) =>
          callback.onError(ex)
      }

      Cancelable.empty
    }

  /**
   * Returns a task that on execution is always finishing
   * in error emitting the specified exception.
   */
  def error(ex: Throwable): Task[Nothing] =
    Task.unsafeCreate { c =>
      c.onError(ex)
      Cancelable.empty
    }

  /**
   * Converts the given `Future` into a `Task`.
   */
  def fromFuture[T](f: => Future[T]): Task[T] =
    Task.unsafeCreate { callback =>
      implicit val s = callback.scheduler
      val cancelable = Cancelable()
      f.onComplete {
        case Success(value) =>
          if (cancelable.cancel())
            callback.onSuccess(value)

        case Failure(ex) =>
          if (cancelable.cancel())
            callback.onError(ex)
      }
      cancelable
    }
}

Until now our API is very similar to more pure Task implementations. But this doesn't cover all usecases that Scala's Future have and one problem is one of memoization. We may want to memoize the result of the last execution such that it can be shared by anybody who executes unsafeRun. So what if we build a version of our Task that can do just that:

trait MemoizableTask[+T] extends Task[T] {
  def isCompleted: Boolean
  def value: Option[Try[T]]
  
  // inherited from Task
  def unsafeRun(c: Callback[T]): Cancelable
}

object MemoizableTask {
  /** Transforms any Task into a memoizable Task */
  def apply(underlying: Task[T]): MemoizableTask[T]
}

Oh, but then we've got a problem: the first unsafeRun execution will pay the full cost of the underlying Task execution, whereas our unsafeRun executions subsequent to our task being completed will come for free. And here we've got these design issues:

  1. even though returning a Cancelable on unsafeRun is common sense, we can no longer return Cancelable on unsafeRun, because we are supposed to memoize the result and reuse it for multiple consumers, so once created, it makes no sense to give the user the ability to cancel
  2. not returning a Cancelable on unsafeRun highlights another fact: if we are to memoize the result for subsequent unsafeRun executions, then it makes sense to start executing this task as soon as possible, as we want to memoize it as soon as possible

But then again, we've arrived at Scala's Future and so it makes no sense to duplicate the effort. We might as well convert to Scala's Future should we need it and be done with it:

trait Task[+T] {
  def unsafeRun(c: Callback[T]): Cancelable
  // ...
  
  /**
   * Converts this task into a Scala `Future`, triggering
   * its execution.
   */
  def asFuture(implicit s: Scheduler): Future[T] = {
    val p = Promise[T]()
    self.unsafeRun(new Callback[T] {
      val scheduler = s
      def onError(ex: Throwable): Unit =
        p.tryFailure(ex)
      def onSuccess(value: T): Unit =
        p.trySuccess(value)
    })

    p.future
  }  
}

Oh wait, what just happened? Do we have a Task that's complementary to Scala's Future? Yes we do.

@milessabin
Copy link

Could you put together a side-by-side comparison between the design you're proposing here and Scalaz's Task? Something short, just bullet points with a few code snippets to illustrate usage.

@alexandru
Copy link
Author

Yes, I will. I'm currently fleshing out the implementation, since this didn't exist yesterday morning and after that I'll provide a comparison :-)

@alexandru
Copy link
Author

Updated :-)

@tpolecat
Copy link

Thanks for this. Very interesting.

Can you clarify whether these programs the same or not?

val x =
  for {
    _ <- Task { println("hi") }
    _ <- Task { println("hi") }
  } yield ()

and

val hi = Task { println("hi") }
val x =
  for {
  _ <- hi
  _ <- hi
} yield ()

@adelbertc
Copy link

Depending on how the final implementation looks, perhaps you may also want to see how suitable it may be to be in Cats itself? :-)

That being said, we do have a WIP branch here - I believe the progress stopped at making it stack safe, with tests showing it StackOverflowError-ing.

@alexandru
Copy link
Author

@tpolecat yes, it behaves like you'd expect, the two programs are the same.

@adelbertc sure; btw, this implementation is stack safe

@ngbinh
Copy link

ngbinh commented Dec 23, 2015

looks great! 👍

Copy link

ghost commented Dec 23, 2015

@tpolecat re question from cats gitter: "is there a better place to discuss this?"

@adelbertc re WIP branch

Both points were brought up by @non in https://github.com/non/cats/issues/32#issuecomment-141079117, but that was a while ago. Specifically:

I think using this issue to brainstorm, ask questions, and help try to flesh out the requirements of a task in Cats would definitely be helpful.

and

Thus, feature/task (in its current state) is not eligible to be merged.

Just as a suggestion, perhaps the old issue has got stale and rather than try and reboot it, it might be worth launching a V2 issue?

@alexandru
Copy link
Author

👍 @inthenow.

I have updated the document. It now contains a list of what this implementation does better than the Task in Scalaz, as I think @milessabin wanted. See the "What's better than Scalaz?" section.

Copy link

ghost commented Dec 23, 2015

Cool. Before I/we do this, there are two issues regarding moving the current Future instances from Cats to Alleycats - non/alleycats#26 and https://github.com/non/cats/issues/589.

Given this - would Alleycats be the better place to open the new issue and possible first implementation?

@alexandru
Copy link
Author

Changed again (Dec 30).

@puffnfresh
Copy link

scalaz.concurrent.Task is not very good. Using just a reasonable IO type, it should be possible to subsume Task's use cases with some uses of MVar-like atomic-references. For example:

scalaz/scalaz@series/7.3.x...puffnfresh:feature/concurrent-io

I highly recommend focusing on doing a good job at I/O and then using that to talk about concurrency, rather than trying to do the opposite.

@gpampara
Copy link

@puffnfresh: 👍

@aloiscochard
Copy link

Hi all,

You might be interested to look at my matterhorn experiment: https://github.com/aloiscochard/matterhorn

It is basically a specialized interpreter for IO actions: https://github.com/aloiscochard/matterhorn/blob/master/core/src/main/scala/rts/Interpreter.scala

There is some tests and benchmarks showing how it work:
https://github.com/aloiscochard/matterhorn/blob/master/core/src/test/scala/CoreSpec.scala
https://github.com/aloiscochard/matterhorn/blob/master/bench/src/main/scala/pure.scala

The approach to concurrency is to use a STM, I just did plug a library for having rapid prototyping.

Good luck

@alexandru
Copy link
Author

@aloiscochard, @puffnfresh thanks, will take a look.

@pchiusano
Copy link

scalaz.concurrent.Task is not very good. Using just a reasonable IO type, it should be possible to subsume Task's use cases with some uses of MVar-like atomic-references. For example:

I agree, and that is the route we are going with FS2's Task type (which I suppose could be renamed to 'IO'), which implements MVar-like references and then uses that to implement async (and lots of other operations, like parallelTraverse, as well as all the concurrent stream operations of FS2 itself): https://github.com/functional-streams-for-scala/fs2/blob/topic/redesign/core/src/main/scala/fs2/Async.scala#L75

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment