Skip to content

Instantly share code, notes, and snippets.

@ghostbuster91
Last active May 7, 2020 10:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ghostbuster91/47ce8d0c102214f05f479abb93b957a4 to your computer and use it in GitHub Desktop.
Save ghostbuster91/47ce8d0c102214f05f479abb93b957a4 to your computer and use it in GitHub Desktop.
class WithdrawBatchingExecutor(repository: WithdrawalRepository,
externalSystem: ExternalWithdrawSystem) {
def startExecutor: Observable[Unit] =
Observable
.intervalWithFixedDelay(config.interval)
.mapEval( _ => repository.findAllWaitingOrderedBySerialIdAsc())
.distinctUntilChanged
.scan(List.empty[Withdrawal]) { (oldSet, newSet) =>
newSet.takeWhile(w => !oldSet.contains(w))
}
.flatMap(Observable.fromIterable)
.groupBy(_.currency)
.mergeMap { partition =>
partition
.bufferTimedAndCounted(config.timeWindow, config.batchSizeThreshold)
.mapEval(batch => externalSystem.executeWithdrawals(batch))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment