Skip to content

Instantly share code, notes, and snippets.

@timcharper
Last active August 29, 2015 14:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timcharper/235f73af5b36e3157c9c to your computer and use it in GitHub Desktop.
Save timcharper/235f73af5b36e3157c9c to your computer and use it in GitHub Desktop.
Akka-Stream-Acknowledgements
val source = Source(RabbitSource(
"Worker",
rabbitMq,
channel(qos = 100),
consume(topic("app.domain.class", topics = List(Pg.Table("MainTable").all, Pg.Table("RelatedTable").all))),
body(as[PgChange]).map(getPrimaryId)))
source.
map { case (p, primaryId) => Delivery(p, primaryId) }.
mapConcat { delivery =>
Delivery.collect(delivery :: Nil) { case Some(i) => i }
}.
groupedWithin(100, 60 seconds).
mapConcat {
_.groupBy(_.data).toList.map { case (primaryId, deliveries) =>
Delivery.reduce(deliveries)(Keep.left)
}
}.
mapAsync(4) { case d @ Delivery(_, primaryId) =>
d.ack(api.pushChangesFor(primaryId))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment