Skip to content

Instantly share code, notes, and snippets.

/post.scala

Created May 31, 2017
Embed
What would you like to do?
the description for this gist
// ActorSystem and ActorMaterializer available in implicit scope
akkaSource
.mapAsync(1) {
case CommitableMsg(MsgA(a), offset) =>
someBusinessFuture(a).map(_ => offset)
case CommitableMsg(_, offset) =>
Future.successful(offset)
}
.groupedWithin(100, 1.seconds)
.filter(_.nonEmpty)
.mapAsync(1)(_.last.commitOffset)
.via(killSwitch.flow)
.runWith(Sink.ignore)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment