Last active
March 27, 2017 17:16
-
-
Save luciferous/bcf67216f21cc2bac3b5 to your computer and use it in GitHub Desktop.
Too Cool for Spool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.concurrent | |
import com.twitter.util.{Await, Future} | |
/** | |
* Value class to wrap the Cons cell. | |
*/ | |
protected case class Cons[A](tuple: (A, AsyncStream[A])) extends AnyVal { | |
def head: A = tuple._1 | |
def tail: AsyncStream[A] = tuple._2 | |
} | |
/** | |
* AsyncStream is an asynchronous stream. | |
*/ | |
final class AsyncStream[A] private (go: => Future[Option[Cons[A]]]) { | |
protected def cons: Future[Option[Cons[A]]] = go | |
@inline final def fold[B](f: Cons[A] => AsyncStream[B]): AsyncStream[B] = AsyncStream( | |
cons.flatMap { | |
case None => Future.None | |
case Some(cons) => f(cons).cons | |
} | |
) | |
def flatMap[B](f: A => AsyncStream[B]): AsyncStream[B] = fold { cons => | |
f(cons.head) ++ cons.tail.flatMap(f) | |
} | |
def map[B](f: A => B): AsyncStream[B] = fold { cons => | |
f(cons.head) *:: cons.tail.map(f) | |
} | |
def filter(f: A => Boolean): AsyncStream[A] = fold { | |
case cons if f(cons.head) => cons.head *:: cons.tail.filter(f) | |
case cons => cons.tail.filter(f) | |
} | |
def withFilter(f: A => Boolean): AsyncStream[A] = filter(f) | |
def ++(that: => AsyncStream[A]): AsyncStream[A] = AsyncStream( | |
cons.flatMap { | |
case None => that.cons | |
case Some(cons) => (cons.head *:: (cons.tail ++ that)).cons | |
} | |
) | |
def foldLeft[B](z: B)(f: (B, A) => B): Future[B] = | |
cons.flatMap { | |
case None => Future.value(z) | |
case Some(cons) => cons.tail.foldLeft(f(z, cons.head))(f) | |
} | |
def foldLeftF[B](z: B)(f: (B, A) => Future[B]): Future[B] = | |
cons.flatMap { | |
case None => Future.value(z) | |
case Some(cons) => f(z, cons.head).flatMap(cons.tail.foldLeftF(_)(f)) | |
} | |
def mapF[B](f: A => Future[B]): AsyncStream[B] = AsyncStream( | |
cons.flatMap { | |
case None => Future.None | |
case Some(cons) => f(cons.head).flatMap(b => (b *:: cons.tail.mapF(f)).cons) | |
} | |
) | |
def *::(a: A): AsyncStream[A] = AsyncStream( | |
Future.value(Some(new Cons(a, this))) | |
) | |
def force: Future[Option[(A, AsyncStream[A])]] = cons.map(_.map(_.tuple)) | |
def toSeq: Seq[A] = toStream | |
def toStream: Stream[A] = | |
Await.result(cons) match { | |
case None => Stream.empty | |
case Some(cons) => cons.head #:: cons.tail.toStream | |
} | |
} | |
object AsyncStream { | |
def empty[A]: AsyncStream[A] = AsyncStream(Future.None) | |
def apply[A](go: => Future[Option[Cons[A]]]): AsyncStream[A] = new AsyncStream(go) | |
def unfold[A, B](z: B)(f: B => Option[(A, B)]): AsyncStream[A] = | |
f(z) match { | |
case None => AsyncStream.empty | |
case Some((a, b)) => a *:: unfold(b)(f) | |
} | |
def unfoldF[A, B](z: B)(f: B => Future[Option[(A, B)]]): AsyncStream[A] = AsyncStream( | |
f(z).flatMap { | |
case None => Future.None | |
case Some((a, b)) => (a *:: unfoldF(b)(f)).cons | |
} | |
) | |
} |
Can we make this explicit in the docs? I used Spool and just found AsyncStream. Would love at least a reference that AsyncStream is generally easier to use and is the path forward
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
protected def cons
needs to be a lazy val.This does definitely look like an improvement... it could be worthwhile to follow through on getting this in and deprecating Spool.