Skip to content

Instantly share code, notes, and snippets.

@mgodave
Last active June 15, 2016 21:53
Show Gist options
  • Save mgodave/a521e4d88d18224720322ee04b29a732 to your computer and use it in GitHub Desktop.
Save mgodave/a521e4d88d18224720322ee04b29a732 to your computer and use it in GitHub Desktop.
import com.twitter.concurrent.AsyncStream
import com.twitter.util._
object Merge {
def merge[E](streams: Seq[AsyncStream[E]]): AsyncStream[E] = {
def inner(next: Seq[Future[Option[(E, () => AsyncStream[E])]]]): AsyncStream[E] = {
AsyncStream.fromFuture(Future.select(next)) flatMap {
case (Return(Some((head, tail))), tails) =>
AsyncStream(head) ++ inner(tail().uncons +: tails)
case (Return(None), Nil) =>
AsyncStream.empty
case (Throw(cause), tails) =>
AsyncStream.fromFuture(Future.exception(cause))
case (_, tails) =>
inner(tails)
}
}
inner(streams.map(_.uncons))
}
def short[E](stream: AsyncStream[E]): (AsyncStream[E], Closable) = {
def inner(data: AsyncStream[E], control: AsyncStream[E]): AsyncStream[E] = {
AsyncStream.fromFuture(Future.select(Seq(data.uncons, control.uncons))) flatMap {
case (Return(None), _) =>
AsyncStream.empty
case (Return(Some((head, tail))), _) =>
AsyncStream(head) ++ inner(tail(), control)
case (Throw(cause), _) =>
AsyncStream.fromFuture(Future.exception(cause))
}
}
val promise = Promise[Unit]()
val trigger = AsyncStream.fromFuture(
promise.map(_ => AsyncStream.empty)
).flatten
val closable = Closable.make { _ =>
promise.updateIfEmpty(Return.Unit)
Future.Done
}
(inner(stream, trigger), closable)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment