Skip to content

Instantly share code, notes, and snippets.

@elizarov
Created April 3, 2018 08:25
Show Gist options
  • Save elizarov/34150213209158d0dce0db62d9c0a20f to your computer and use it in GitHub Desktop.
Save elizarov/34150213209158d0dce0db62d9c0a20f to your computer and use it in GitHub Desktop.
Channel operator to send distinct time elements in a specific time window
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
import kotlinx.coroutines.experimental.selects.*
import java.util.concurrent.*
// operator
fun <T> ReceiveChannel<T>.distinctInTimeWindow(time: Long, unit: TimeUnit): ReceiveChannel<T> = produce {
require(time > 0)
consume {
val source = this@distinctInTimeWindow
val set = LinkedHashSet<T>()
var deadlineNanos = System.nanoTime()
var done = false
while (!done) { // loop for all windows while source is not over
deadlineNanos += unit.toNanos(time) // next time window's deadline
var timedOut = false
while (!timedOut) { // compose a window while not timed out
val remainingNanos = deadlineNanos - System.nanoTime()
if (remainingNanos <= 0) break // this window is over
try {
timedOut = select<Boolean> {
onTimeout(remainingNanos, TimeUnit.NANOSECONDS) {
true // timed out
}
source.onReceive {
set.add(it)
false // no timeout -- continue window
}
}
} catch (e: ClosedReceiveChannelException) {
done = true
break // expected at the end of source, just break out of window's loop
}
}
// send composed window
set.forEach { send(it) }
set.clear()
}
}
}
// test code
fun main(args: Array<String>) = runBlocking {
val msgs = listOf("A", "B", "A", "B", "A", "B", "A", "B", "A", "B")
// send them every 100ms
val source = produce {
msgs.forEach {
send(it)
delay(100)
}
}
// distinct in 310 ms windows:
source
.distinctInTimeWindow(310, TimeUnit.MILLISECONDS)
.consumeEach { println("${System.currentTimeMillis()}: $it") }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment