Skip to content

Instantly share code, notes, and snippets.

@johnynek
Last active December 18, 2015 23:09
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save johnynek/5859909 to your computer and use it in GitHub Desktop.
Save johnynek/5859909 to your computer and use it in GitHub Desktop.
A simple API using Future and Promise to create an async stream.
// could easily be ported to Scala Future
import com.twitter.util.{Promise, Future}
trait Source[+T] { self =>
def head: Option[T]
def tail: Future[Source[T]]
def map[U](fn: T => U): Source[U] = new Source[U] {
def head = self.head.map(fn)
def tail = self.tail.map { _.map(fn) }
}
def concat[U>:T](that: Future[Source[U]]): Future[Source[U]] =
if(head.isEmpty)
that
else Future.value(new Source[U] {
def head = self.head
def tail = self.tail.flatMap { _.concat(that) }
})
def foldLeft[U](init: Future[U])(fn: (U,T) => Future[U]): Future[U] =
if(head.isEmpty) init
else for {
u <- init
tailS <- tail
res <- tailS.foldLeft(fn(u,head.get))(fn)
} yield res
// The monad is on Future[Source[T]]
//def flatMap[U](fn: T => Future[Source[T]]): Future[Source[T]]
// TODO: add methods in terms of the above here:
// take(n: Int): Source[T], takeWhile, drop, scan
// cue the monad discussion that we really just want a Stream[M[_], T] with some Monad[M]
}
object Source {
def empty: Source[Nothing] = new Source[Nothing] {
def head = None
def tail = Future.exception(new Exception("Empty"))
}
def fromStream[T](s: Stream[T]): Source[T] = new Source[T] {
def head = if(s.isEmpty) None else Some(s.head)
def tail = Future.value(fromStream(s.tail))
}
}
trait Sink[T] {
def source: Future[Source[T]]
// only put once, returns the next Sink to put into
def put(t: T): Sink[T]
def finish: Unit
}
object Sink {
def empty[T]: Sink[T] = consumer(new Promise[(Option[T], Future[Source[T]])]())
private def consumer[T](promise: Promise[(Option[T], Future[Source[T]])]): Sink[T] = new Sink[T] {
def source = promise.map { ht =>
new Source[T] {
def head = ht._1
def tail = ht._2
}
}
def put(t: T): Sink[T] = {
val nextSink = empty[T]
promise.setValue((Some(t), nextSink.source))
nextSink
}
def finish: Unit = {
promise.setValue(None, empty[T].source)
}
}
}
/** Now play with it:
scala> Sink.empty[Int]
res10: Sink[Int] = Sink$$anon$1@2d51f840
scala> res10.put(10)
res11: Sink[Int] = Sink$$anon$1@6a9bbaed
scala> res11.put(11)
res12: Sink[Int] = Sink$$anon$1@26efec43
scala> res12.put(12)
res13: Sink[Int] = Sink$$anon$1@3f44c5bd
scala> res10.tail.get.tail.get.head.get ^
res14: Int = 12
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment