Skip to content

Instantly share code, notes, and snippets.

@tobiberger
Last active July 20, 2020 18:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tobiberger/178f436d23fa40f388f4c891921cf0f7 to your computer and use it in GitHub Desktop.
Save tobiberger/178f436d23fa40f388f4c891921cf0f7 to your computer and use it in GitHub Desktop.
import java.time.ZonedDateTime
data class MessageWithExpiry(
val messageSerialized: ByteArray,
val expiryDate: ZonedDateTime
)
data class Batch(
val messages: Array<ByteArray>,
val latestMessageExpiryDate: ZonedDateTime
)
private val MAX_BATCH_SIZE = 8 * 1024 * 1024
fun Sequence<MessageWithExpiry>.batchMessages() = sequence {
val batchMessages = mutableListOf<MessageWithExpiry>()
var batchSize = 0
this@batchMessages.forEach { message ->
val messageSize = message.messageSerialized.size
if (batchSize + messageSize > MAX_BATCH_SIZE && batchMessages.isNotEmpty()) {
yield(batchMessages.toBatch())
batchMessages.clear()
batchSize = 0
}
batchMessages += message
batchSize += messageSize
}
if (batchMessages.isNotEmpty()) {
yield(batchMessages.toBatch())
}
}
fun MutableList<MessageWithExpiry>.toBatch(): Batch {
return Batch(
map { it.messageSerialized }.toTypedArray(),
maxBy { it.expiryDate }!!.expiryDate
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment