Skip to content

Instantly share code, notes, and snippets.

@timcharper
Created July 20, 2015 03:02
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save timcharper/364abe540705ca3923a7 to your computer and use it in GitHub Desktop.
// RabbitSource in op-rabbit v1.0.0-M9 returns an AckedSource
val source = RabbitSource(
"Worker",
rabbitMq,
channel(qos = 100),
consume(topic("app.domain.class", topics = List(Pg.Table("MainTable").all, Pg.Table("RelatedTable").all))),
body(as[PgChange]).map(getPrimaryId))
source.
collect { case Some(i) => i }.
groupedWithin(100, 60 seconds).
mapConcat { _.distinct }.
mapAsync(4) { primaryId =>
api.pushChangesFor(primaryId)
}.
runAck
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment