Skip to content

Instantly share code, notes, and snippets.

Created May 31, 2017 19:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/9a6c00a67162af7a09a7309a1fa8c651 to your computer and use it in GitHub Desktop.
Save anonymous/9a6c00a67162af7a09a7309a1fa8c651 to your computer and use it in GitHub Desktop.
the description for this gist
// ActorSystem and ActorMaterializer available in implicit scope
akkaSource
.mapAsync(1) {
case CommitableMsg(MsgA(a), offset) =>
someBusinessFuture(a).map(_ => offset)
case CommitableMsg(_, offset) =>
Future.successful(offset)
}
.groupedWithin(100, 1.seconds)
.filter(_.nonEmpty)
.mapAsync(1)(_.last.commitOffset)
.via(killSwitch.flow)
.runWith(Sink.ignore)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment