Skip to content

Instantly share code, notes, and snippets.

@Bill
Created July 23, 2019 23:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Bill/09dede50c1a23ad90675e59298c9bd9b to your computer and use it in GitHub Desktop.
Save Bill/09dede50c1a23ad90675e59298c9bd9b to your computer and use it in GitHub Desktop.
use CompletableDeferred to do request-response style messaging to a coroutine
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.channels.ticker
import kotlinx.coroutines.selects.select
import java.time.LocalDateTime
fun log2(msg:String) {
println("$msg in thread ${Thread.currentThread()}")
}
fun CoroutineScope.accumulateAndSnapshot(
content: ReceiveChannel<String>,
snapshotRequests: ReceiveChannel<CompletableDeferred<Collection<String>>>):
Job = launch {
log2("starting accumulateAndSnapshot")
val gatheredContent = mutableListOf<String>()
val clearContent = ticker(50)
while(true) {
select<Unit> {
content.onReceive {
log2("received content: $it")
gatheredContent.add(it)
}
snapshotRequests.onReceive {
log2("received snapshot request")
assert(it.complete(gatheredContent.toList()) == true)
}
clearContent.onReceive {
log2("clearing content")
gatheredContent.clear()
}
}
}
}
fun main() = runBlocking<Unit> {
val contentProducer = produce {
while(true) {
delay(10)
val content = LocalDateTime.now().toString()
log2("sending content: $content")
send(content)
}
}
val snapshotter = produce {
while(true) {
delay(100)
val snapshotDeferred = CompletableDeferred<Collection<String>>()
log2("sending snapshot request (deferred)")
send(snapshotDeferred)
val x = snapshotDeferred.await()
println("got deferred value: $x")
}
}
val thingy = accumulateAndSnapshot(contentProducer, snapshotter)
delay(10000)
coroutineContext.cancelChildren()
}
@Bill
Copy link
Author

Bill commented Jul 23, 2019

output like:

starting accumulateAndSnapshot in thread Thread[main,5,main]
sending content: 2019-07-23T16:43:52.446769 in thread Thread[main,5,main]
received content: 2019-07-23T16:43:52.446769 in thread Thread[main,5,main]
sending content: 2019-07-23T16:43:52.462025 in thread Thread[main,5,main]
received content: 2019-07-23T16:43:52.462025 in thread Thread[main,5,main]
sending content: 2019-07-23T16:43:52.472328 in thread Thread[main,5,main]
received content: 2019-07-23T16:43:52.472328 in thread Thread[main,5,main]
clearing content in thread Thread[main,5,main]
sending content: 2019-07-23T16:43:52.483193 in thread Thread[main,5,main]
received content: 2019-07-23T16:43:52.483193 in thread Thread[main,5,main]
sending content: 2019-07-23T16:43:52.494229 in thread Thread[main,5,main]
received content: 2019-07-23T16:43:52.494229 in thread Thread[main,5,main]
sending content: 2019-07-23T16:43:52.505235 in thread Thread[main,5,main]
received content: 2019-07-23T16:43:52.505235 in thread Thread[main,5,main]
sending snapshot request (deferred) in thread Thread[main,5,main]
received snapshot request in thread Thread[main,5,main]
clearing content in thread Thread[main,5,main]
sending content: 2019-07-23T16:43:52.526822 in thread Thread[main,5,main]
got deferred value: [2019-07-23T16:43:52.483193, 2019-07-23T16:43:52.494229, 2019-07-23T16:43:52.505235]
received content: 2019-07-23T16:43:52.526822 in thread Thread[main,5,main]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment