Skip to content

Instantly share code, notes, and snippets.

@luciferous
Last active March 27, 2017 17:16
Show Gist options
  • Save luciferous/bcf67216f21cc2bac3b5 to your computer and use it in GitHub Desktop.
Save luciferous/bcf67216f21cc2bac3b5 to your computer and use it in GitHub Desktop.
Too Cool for Spool
package com.twitter.concurrent
import com.twitter.util.{Await, Future}
/**
* Value class to wrap the Cons cell.
*/
protected case class Cons[A](tuple: (A, AsyncStream[A])) extends AnyVal {
def head: A = tuple._1
def tail: AsyncStream[A] = tuple._2
}
/**
* AsyncStream is an asynchronous stream.
*/
final class AsyncStream[A] private (go: => Future[Option[Cons[A]]]) {
protected def cons: Future[Option[Cons[A]]] = go
@inline final def fold[B](f: Cons[A] => AsyncStream[B]): AsyncStream[B] = AsyncStream(
cons.flatMap {
case None => Future.None
case Some(cons) => f(cons).cons
}
)
def flatMap[B](f: A => AsyncStream[B]): AsyncStream[B] = fold { cons =>
f(cons.head) ++ cons.tail.flatMap(f)
}
def map[B](f: A => B): AsyncStream[B] = fold { cons =>
f(cons.head) *:: cons.tail.map(f)
}
def filter(f: A => Boolean): AsyncStream[A] = fold {
case cons if f(cons.head) => cons.head *:: cons.tail.filter(f)
case cons => cons.tail.filter(f)
}
def withFilter(f: A => Boolean): AsyncStream[A] = filter(f)
def ++(that: => AsyncStream[A]): AsyncStream[A] = AsyncStream(
cons.flatMap {
case None => that.cons
case Some(cons) => (cons.head *:: (cons.tail ++ that)).cons
}
)
def foldLeft[B](z: B)(f: (B, A) => B): Future[B] =
cons.flatMap {
case None => Future.value(z)
case Some(cons) => cons.tail.foldLeft(f(z, cons.head))(f)
}
def foldLeftF[B](z: B)(f: (B, A) => Future[B]): Future[B] =
cons.flatMap {
case None => Future.value(z)
case Some(cons) => f(z, cons.head).flatMap(cons.tail.foldLeftF(_)(f))
}
def mapF[B](f: A => Future[B]): AsyncStream[B] = AsyncStream(
cons.flatMap {
case None => Future.None
case Some(cons) => f(cons.head).flatMap(b => (b *:: cons.tail.mapF(f)).cons)
}
)
def *::(a: A): AsyncStream[A] = AsyncStream(
Future.value(Some(new Cons(a, this)))
)
def force: Future[Option[(A, AsyncStream[A])]] = cons.map(_.map(_.tuple))
def toSeq: Seq[A] = toStream
def toStream: Stream[A] =
Await.result(cons) match {
case None => Stream.empty
case Some(cons) => cons.head #:: cons.tail.toStream
}
}
object AsyncStream {
def empty[A]: AsyncStream[A] = AsyncStream(Future.None)
def apply[A](go: => Future[Option[Cons[A]]]): AsyncStream[A] = new AsyncStream(go)
def unfold[A, B](z: B)(f: B => Option[(A, B)]): AsyncStream[A] =
f(z) match {
case None => AsyncStream.empty
case Some((a, b)) => a *:: unfold(b)(f)
}
def unfoldF[A, B](z: B)(f: B => Future[Option[(A, B)]]): AsyncStream[A] = AsyncStream(
f(z).flatMap {
case None => Future.None
case Some((a, b)) => (a *:: unfoldF(b)(f)).cons
}
)
}
@baugarten
Copy link

Can we make this explicit in the docs? I used Spool and just found AsyncStream. Would love at least a reference that AsyncStream is generally easier to use and is the path forward

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment