Skip to content

Instantly share code, notes, and snippets.

@josephlbarnett
Last active March 11, 2020 17:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save josephlbarnett/1ebd20d5c3cb3684238a6d4d6f81574c to your computer and use it in GitHub Desktop.
Save josephlbarnett/1ebd20d5c3cb3684238a6d4d6f81574c to your computer and use it in GitHub Desktop.
package com.trib3.graphql.websocket
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.sendBlocking
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.launch
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.asPublisher
import kotlinx.coroutines.slf4j.MDCContext
import mu.KotlinLogging
import org.testng.annotations.Test
import java.util.concurrent.CountDownLatch
private val log = KotlinLogging.logger {}
class Processor(val channel: Channel<String>, val latch: CountDownLatch, dispatcher: CoroutineDispatcher) :
CoroutineScope by CoroutineScope(dispatcher) {
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
suspend fun handleMessage(message: String) {
log.info("Got message: $message")
when (message) {
"start" -> {
launch(MDCContext()) {
flow {
while(true) {
emit("data")
Thread.sleep(20)
}
}
// uncommenting the next line makes the first emission "fast"
// leaving it out makes it "slow"
// Note that it needs to be before the .asPublisher().asFlow() step
// to have an effect. Note also that skipping the .asPublisher().asFlow()
// also avoids the slowness.
// .flowOn(Dispatchers.IO)
.asPublisher()
.asFlow()
.collect {
channel.send(it)
}
}
}
"data" -> {
latch.countDown()
}
}
}
}
class DelayTest {
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
@Test
fun testDataTiming() {
val channel = Channel<String>()
val flow = channel.consumeAsFlow()
val latch = CountDownLatch(1)
val processor = Processor(channel, latch, Dispatchers.Default)
processor.launch {
flow.collect {
processor.handleMessage(it)
}
}
channel.sendBlocking("start")
log.info("awaiting collection")
val start = System.currentTimeMillis()
latch.await()
log.info("await took ${System.currentTimeMillis() - start}")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment