Skip to content

Instantly share code, notes, and snippets.

@KadekM
Last active May 10, 2021 20:14
Show Gist options
  • Save KadekM/19c4991152743155f41440dc94b27ee4 to your computer and use it in GitHub Desktop.
Save KadekM/19c4991152743155f41440dc94b27ee4 to your computer and use it in GitHub Desktop.
.tap(x => log.info(x.value.toString))
.foreachChunk { chunk =>
val offsetBatch = OffsetBatch(chunk.map(_.offset))
val offsets = offsetBatch.offsets
val offsetsList = offsets
.map { case (topicPartition, offset) => partitionToString(topicPartition) -> offset }
.toList
val commands = chunk.map(_.value)
session.transaction.toManagedZIO.use { transaction =>
for {
_ <- ZIO.foreach_(commands) {
case TransferBalance(from, to, amount) =>
for {
_ <- session
.prepare(Sql.changeBalanceCommand)
.toManagedZIO
.use(_.execute(from ~ -amount)) // notice the minus sign
_ <- session
.prepare(Sql.changeBalanceCommand)
.toManagedZIO
.use(_.execute(to ~ amount))
} yield ()
}
_ <- session
.prepare(Sql.updatePartitionCommand(offsetsList))
.toManagedZIO
.use(_.execute(offsetsList))
_ <- transaction.commit
} yield ()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment