Skip to content

Instantly share code, notes, and snippets.

@elizarov
Created July 3, 2020 09:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save elizarov/63f6d6eb8542e052cc74091d1faab367 to your computer and use it in GitHub Desktop.
Save elizarov/63f6d6eb8542e052cc74091d1faab367 to your computer and use it in GitHub Desktop.
Flow.asPublisher stress-test with concurrent request/onNext
import kotlin.coroutines.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.*
import org.reactivestreams.*
import java.util.concurrent.*
import java.util.concurrent.atomic.*
import kotlin.random.*
fun log(s: String) {
println("[${Thread.currentThread().name}] $s")
}
fun main() {
var subscription: Subscription? = null
val expectedValue = AtomicLong(0)
val requestedTill = AtomicLong(0)
val minBatch = 5L
val maxBatch = 20L
mtFlow().asPublisher().subscribe(object : Subscriber<Long> {
override fun onComplete() {
log("onComplete()")
}
override fun onSubscribe(sub: Subscription?) {
log("onSubscribe($sub)")
subscription = sub
requestBatches()
}
private fun requestBatches() {
while (requestedTill.get() < expectedValue.get() + maxBatch) {
val nextBatchSize = Random.nextLong(minBatch..maxBatch)
requestedTill.addAndGet(nextBatchSize)
pool.execute {
subscription!!.request(nextBatchSize)
}
}
}
override fun onNext(value: Long) {
// periodically log
if (value % 200_000 == 0L) log("onNext($value)")
// check for expected value
if (value != expectedValue.get()) {
log("UNEXPECTED VALUE: $value")
}
val nextExpected = value + 1
expectedValue.set(nextExpected)
// send more concurrent requests
if (requestedTill.get() < expectedValue.get() + minBatch) requestBatches()
}
override fun onError(ex: Throwable?) {
log("onError($ex)")
}
})
repeat(100) {
Thread.sleep(1000)
log("nextValue = ${nextValue.get()}, expectedValue = ${expectedValue.get()}")
}
subscription?.cancel()
}
fun mtFlow(): Flow<Long> = flow {
while (true) {
emit(aWait())
}
}
suspend fun aWait(): Long = suspendCancellableCoroutine { cont ->
pool.execute(Runnable {
cont.resume(nextValue.getAndIncrement())
})
}
val pool = Executors.newFixedThreadPool(8) { r -> Thread(r).apply { isDaemon = true } }
val nextValue = AtomicLong(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment