Skip to content

Instantly share code, notes, and snippets.

@beezee
Created October 27, 2015 12:13
Show Gist options
  • Save beezee/7fb24d2d7b89bf783cf2 to your computer and use it in GitHub Desktop.
Save beezee/7fb24d2d7b89bf783cf2 to your computer and use it in GitHub Desktop.
Akka Streams with acknowledgement using bcast ~> zip
package main
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorAttributes}
import akka.stream.scaladsl._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
object StreamProgram {
implicit lazy val system = ActorSystem("example")
implicit val materializer = ActorMaterializer()
def msgFlow: Flow[(String, String), String, Unit] =
Flow[(String, String)].map(_._1)
type Dumb = (String, Int, String)
def streamFlow: Flow[(String, String), List[Dumb], Unit] =
Flow[(String, String)]
.mapAsync(10)(x => Source(List.fill(10)(x._2))
.mapAsyncUnordered(10)(x => Future { (x, 1) } )
.mapAsyncUnordered(10)(x =>
Future { (x._1, x._2, "baz") })
.runWith(Sink.fold[List[Dumb], Dumb](
List.empty[Dumb])((l, e) => e :: l)))
val ackFlow: Flow[(String, String), (String, List[Dumb]), Unit] =
Flow() { implicit b =>
import FlowGraph.Implicits._
val broadcast = b.add(Broadcast[(String, String)](2))
val zip = b.add(Zip[String, List[Dumb]])
broadcast ~> msgFlow ~> zip.in0
broadcast ~> streamFlow ~> zip.in1
(broadcast.in, zip.out)
}
def run(): Unit = {
implicit lazy val system = ActorSystem("example")
implicit val materializer = ActorMaterializer()
Source(List.fill(10)(("ack", "foo"))
.via(ackFlow)
.runWith(Sink.foreach(println))
.onComplete {
case _ => system.shutdown()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment