Created
April 3, 2018 08:25
-
-
Save elizarov/34150213209158d0dce0db62d9c0a20f to your computer and use it in GitHub Desktop.
Channel operator to send distinct time elements in a specific time window
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.experimental.* | |
import kotlinx.coroutines.experimental.channels.* | |
import kotlinx.coroutines.experimental.selects.* | |
import java.util.concurrent.* | |
// operator | |
fun <T> ReceiveChannel<T>.distinctInTimeWindow(time: Long, unit: TimeUnit): ReceiveChannel<T> = produce { | |
require(time > 0) | |
consume { | |
val source = this@distinctInTimeWindow | |
val set = LinkedHashSet<T>() | |
var deadlineNanos = System.nanoTime() | |
var done = false | |
while (!done) { // loop for all windows while source is not over | |
deadlineNanos += unit.toNanos(time) // next time window's deadline | |
var timedOut = false | |
while (!timedOut) { // compose a window while not timed out | |
val remainingNanos = deadlineNanos - System.nanoTime() | |
if (remainingNanos <= 0) break // this window is over | |
try { | |
timedOut = select<Boolean> { | |
onTimeout(remainingNanos, TimeUnit.NANOSECONDS) { | |
true // timed out | |
} | |
source.onReceive { | |
set.add(it) | |
false // no timeout -- continue window | |
} | |
} | |
} catch (e: ClosedReceiveChannelException) { | |
done = true | |
break // expected at the end of source, just break out of window's loop | |
} | |
} | |
// send composed window | |
set.forEach { send(it) } | |
set.clear() | |
} | |
} | |
} | |
// test code | |
fun main(args: Array<String>) = runBlocking { | |
val msgs = listOf("A", "B", "A", "B", "A", "B", "A", "B", "A", "B") | |
// send them every 100ms | |
val source = produce { | |
msgs.forEach { | |
send(it) | |
delay(100) | |
} | |
} | |
// distinct in 310 ms windows: | |
source | |
.distinctInTimeWindow(310, TimeUnit.MILLISECONDS) | |
.consumeEach { println("${System.currentTimeMillis()}: $it") } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment