-
-
Save josephlbarnett/1ebd20d5c3cb3684238a6d4d6f81574c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.trib3.graphql.websocket | |
import kotlinx.coroutines.CoroutineDispatcher | |
import kotlinx.coroutines.CoroutineScope | |
import kotlinx.coroutines.Dispatchers | |
import kotlinx.coroutines.ExperimentalCoroutinesApi | |
import kotlinx.coroutines.FlowPreview | |
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.channels.sendBlocking | |
import kotlinx.coroutines.flow.collect | |
import kotlinx.coroutines.flow.consumeAsFlow | |
import kotlinx.coroutines.flow.flow | |
import kotlinx.coroutines.flow.flowOn | |
import kotlinx.coroutines.launch | |
import kotlinx.coroutines.reactive.asFlow | |
import kotlinx.coroutines.reactive.asPublisher | |
import kotlinx.coroutines.slf4j.MDCContext | |
import mu.KotlinLogging | |
import org.testng.annotations.Test | |
import java.util.concurrent.CountDownLatch | |
private val log = KotlinLogging.logger {} | |
class Processor(val channel: Channel<String>, val latch: CountDownLatch, dispatcher: CoroutineDispatcher) : | |
CoroutineScope by CoroutineScope(dispatcher) { | |
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) | |
suspend fun handleMessage(message: String) { | |
log.info("Got message: $message") | |
when (message) { | |
"start" -> { | |
launch(MDCContext()) { | |
flow { | |
while(true) { | |
emit("data") | |
Thread.sleep(20) | |
} | |
} | |
// uncommenting the next line makes the first emission "fast" | |
// leaving it out makes it "slow" | |
// Note that it needs to be before the .asPublisher().asFlow() step | |
// to have an effect. Note also that skipping the .asPublisher().asFlow() | |
// also avoids the slowness. | |
// .flowOn(Dispatchers.IO) | |
.asPublisher() | |
.asFlow() | |
.collect { | |
channel.send(it) | |
} | |
} | |
} | |
"data" -> { | |
latch.countDown() | |
} | |
} | |
} | |
} | |
class DelayTest { | |
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) | |
@Test | |
fun testDataTiming() { | |
val channel = Channel<String>() | |
val flow = channel.consumeAsFlow() | |
val latch = CountDownLatch(1) | |
val processor = Processor(channel, latch, Dispatchers.Default) | |
processor.launch { | |
flow.collect { | |
processor.handleMessage(it) | |
} | |
} | |
channel.sendBlocking("start") | |
log.info("awaiting collection") | |
val start = System.currentTimeMillis() | |
latch.await() | |
log.info("await took ${System.currentTimeMillis() - start}") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment