Skip to content

Instantly share code, notes, and snippets.

@kas-kad
Last active October 9, 2017 11:54
Show Gist options
  • Save kas-kad/7cb19dd97b7584953522778b9dec8776 to your computer and use it in GitHub Desktop.
Save kas-kad/7cb19dd97b7584953522778b9dec8776 to your computer and use it in GitHub Desktop.
the synchronous function gradually iterates over a stream during a given timeInterval.
fun <T> gradualLoop(timeInterval: Long, stream: Stream<T>, streamElementsCount: Long, action: (T) -> Unit) {
println("total elements: ${streamElementsCount}")
if (streamElementsCount > 0) {
var accumulatingOverdue: Long = 0L
var iterationLenFactMillis: Long = 0L // used to compensate iteration duration, so that we don't sleep through the end of the timeInterval
val iterationMaxAvgLenMillis: Long = timeInterval / streamElementsCount // maximum sleep duration
println("max iteration duration must be: ${iterationMaxAvgLenMillis} ms")
stream.forEach {
val startNanoTime = System.nanoTime()
action(it)
iterationLenFactMillis = (maxOf(System.nanoTime() - startNanoTime, 0)) / 1000000
println("iteration took ${iterationLenFactMillis} ms")
var sleepInterval: Long = 0L
val difference = iterationMaxAvgLenMillis - iterationLenFactMillis
if (difference < 0) {
sleepInterval = 0L
accumulatingOverdue += Math.abs(difference)
} else {
sleepInterval = maxOf(difference - accumulatingOverdue, 0)
accumulatingOverdue = maxOf(accumulatingOverdue - difference, 0)
}
Thread.sleep(sleepInterval)
println("slept for ${sleepInterval} ms, total overdue: ${accumulatingOverdue} ms")
println("\n")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment