Skip to content

Instantly share code, notes, and snippets.

@timcharper
Created July 20, 2015 03:28
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timcharper/d943b6a8300942d1b31f to your computer and use it in GitHub Desktop.
Save timcharper/d943b6a8300942d1b31f to your computer and use it in GitHub Desktop.
val input = (Stream.continually(Promise[Unit]) zip Range.inclusive(1, 5)).toList
implicit val materializer = ActorMaterializer()
val source = AckedSource(input)
val Seq(s1, s2) = (1 to 2) map { n => AckedSink.fold[Int, Int](0)(_ + _) }
val g = AckedFlowGraph.closed(s1, s2)((m1, m2) => (m1, m2)) { implicit b =>
(s1, s2) =>
import AckedFlowGraph.Implicits._
val broadcast = b.add(AckedBroadcast[Int](2))
source ~> broadcast ~> s1
broadcast ~> s2
}
// If AckedBroadcast failed to create a unique promise for each stream, then "java.lang.IllegalStateException: Promise already completed" would be thrown.
val (f1, f2) = g.run()
val (r1, r2) = (await(f1), await(f2))
r1 should be (15)
r2 should be (15)
input.foreach { case (p, _) => await(p.future) }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment