Skip to content

Instantly share code, notes, and snippets.

@Bill
Created July 22, 2019 22:50
Show Gist options
  • Save Bill/9e53e351a3bd1b85217084a8d69f714b to your computer and use it in GitHub Desktop.
Save Bill/9e53e351a3bd1b85217084a8d69f714b to your computer and use it in GitHub Desktop.
CoroutineScope.timeWindow(producer: ReceiveChannel<T>, delayMillis: Long): ReceiveChannel<List<T>>
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.channels.ticker
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.selects.select
import java.time.LocalDateTime
fun CoroutineScope.producer() = produce<String> {
while (true) {
delay(300)
send(LocalDateTime.now().toString())
}
}
fun <T> CoroutineScope.timeWindow(producer: ReceiveChannel<T>, delayMillis: Long) = produce<List<T>> {
val ticker = ticker(delayMillis)
var seen = mutableListOf<T>()
while (true) {
select<Unit> {
// <Unit> means that this select expression does not produce any result
producer.onReceive { value ->
// this is the first select clause
seen.add(value)
}
ticker.onReceive {
println(" sending window: ${seen}")
this@produce.send(seen)
seen = mutableListOf<T>()
}
}
}
}
fun main() = runBlocking<Unit> {
val windows = timeWindow(producer(), 900)
repeat(5) {
val window = windows.receive()
println("received window: $window")
}
coroutineContext.cancelChildren()
}
@Bill
Copy link
Author

Bill commented Jul 22, 2019

output:

 sending window: [2019-07-22T15:47:47.470512, 2019-07-22T15:47:47.778584]
received window: [2019-07-22T15:47:47.470512, 2019-07-22T15:47:47.778584]
 sending window: [2019-07-22T15:47:48.079849, 2019-07-22T15:47:48.384814, 2019-07-22T15:47:48.688146]
received window: [2019-07-22T15:47:48.079849, 2019-07-22T15:47:48.384814, 2019-07-22T15:47:48.688146]
 sending window: [2019-07-22T15:47:48.991578, 2019-07-22T15:47:49.294997, 2019-07-22T15:47:49.598437]
received window: [2019-07-22T15:47:48.991578, 2019-07-22T15:47:49.294997, 2019-07-22T15:47:49.598437]
 sending window: [2019-07-22T15:47:49.901294, 2019-07-22T15:47:50.206251, 2019-07-22T15:47:50.507263]
received window: [2019-07-22T15:47:49.901294, 2019-07-22T15:47:50.206251, 2019-07-22T15:47:50.507263]
 sending window: [2019-07-22T15:47:50.811459, 2019-07-22T15:47:51.114972, 2019-07-22T15:47:51.420586]
received window: [2019-07-22T15:47:50.811459, 2019-07-22T15:47:51.114972, 2019-07-22T15:47:51.420586]

Process finished with exit code 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment