Skip to content

Instantly share code, notes, and snippets.

@luciferous
Last active March 27, 2017 17:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save luciferous/bcf67216f21cc2bac3b5 to your computer and use it in GitHub Desktop.
Save luciferous/bcf67216f21cc2bac3b5 to your computer and use it in GitHub Desktop.
Too Cool for Spool
package com.twitter.concurrent
import com.twitter.util.{Await, Future}
/**
* Value class to wrap the Cons cell.
*/
protected case class Cons[A](tuple: (A, AsyncStream[A])) extends AnyVal {
def head: A = tuple._1
def tail: AsyncStream[A] = tuple._2
}
/**
* AsyncStream is an asynchronous stream.
*/
final class AsyncStream[A] private (go: => Future[Option[Cons[A]]]) {
protected def cons: Future[Option[Cons[A]]] = go
@inline final def fold[B](f: Cons[A] => AsyncStream[B]): AsyncStream[B] = AsyncStream(
cons.flatMap {
case None => Future.None
case Some(cons) => f(cons).cons
}
)
def flatMap[B](f: A => AsyncStream[B]): AsyncStream[B] = fold { cons =>
f(cons.head) ++ cons.tail.flatMap(f)
}
def map[B](f: A => B): AsyncStream[B] = fold { cons =>
f(cons.head) *:: cons.tail.map(f)
}
def filter(f: A => Boolean): AsyncStream[A] = fold {
case cons if f(cons.head) => cons.head *:: cons.tail.filter(f)
case cons => cons.tail.filter(f)
}
def withFilter(f: A => Boolean): AsyncStream[A] = filter(f)
def ++(that: => AsyncStream[A]): AsyncStream[A] = AsyncStream(
cons.flatMap {
case None => that.cons
case Some(cons) => (cons.head *:: (cons.tail ++ that)).cons
}
)
def foldLeft[B](z: B)(f: (B, A) => B): Future[B] =
cons.flatMap {
case None => Future.value(z)
case Some(cons) => cons.tail.foldLeft(f(z, cons.head))(f)
}
def foldLeftF[B](z: B)(f: (B, A) => Future[B]): Future[B] =
cons.flatMap {
case None => Future.value(z)
case Some(cons) => f(z, cons.head).flatMap(cons.tail.foldLeftF(_)(f))
}
def mapF[B](f: A => Future[B]): AsyncStream[B] = AsyncStream(
cons.flatMap {
case None => Future.None
case Some(cons) => f(cons.head).flatMap(b => (b *:: cons.tail.mapF(f)).cons)
}
)
def *::(a: A): AsyncStream[A] = AsyncStream(
Future.value(Some(new Cons(a, this)))
)
def force: Future[Option[(A, AsyncStream[A])]] = cons.map(_.map(_.tuple))
def toSeq: Seq[A] = toStream
def toStream: Stream[A] =
Await.result(cons) match {
case None => Stream.empty
case Some(cons) => cons.head #:: cons.tail.toStream
}
}
object AsyncStream {
def empty[A]: AsyncStream[A] = AsyncStream(Future.None)
def apply[A](go: => Future[Option[Cons[A]]]): AsyncStream[A] = new AsyncStream(go)
def unfold[A, B](z: B)(f: B => Option[(A, B)]): AsyncStream[A] =
f(z) match {
case None => AsyncStream.empty
case Some((a, b)) => a *:: unfold(b)(f)
}
def unfoldF[A, B](z: B)(f: B => Future[Option[(A, B)]]): AsyncStream[A] = AsyncStream(
f(z).flatMap {
case None => Future.None
case Some((a, b)) => (a *:: unfoldF(b)(f)).cons
}
)
}
@stuhood
Copy link

stuhood commented Feb 25, 2015

protected def cons needs to be a lazy val.

This does definitely look like an improvement... it could be worthwhile to follow through on getting this in and deprecating Spool.

@baugarten
Copy link

Can we make this explicit in the docs? I used Spool and just found AsyncStream. Would love at least a reference that AsyncStream is generally easier to use and is the path forward

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment