Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
package com.techyourchance.multithreading.demonstrations.purecoroutines
import android.util.Log
import com.techyourchance.multithreading.DefaultConfiguration
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import java.util.concurrent.atomic.AtomicInteger
class ProducerConsumerCoroutinesOnlyBenchmark {
class Result(val executionTime: Long, val numOfReceivedMessages: Int)
private val numOfReceivedMessages: AtomicInteger = AtomicInteger(0)
private val channel = Channel<Int>(BLOCKING_QUEUE_CAPACITY)
suspend fun startBenchmark(): Result = withContext(Dispatchers.IO) {
val startTimestamp = System.currentTimeMillis()
val deferredProducers = async {
produceNumbers()
}
val deferredConsumers = async {
consumeNumbers()
}
awaitAll(deferredConsumers, deferredProducers)
Result(System.currentTimeMillis() - startTimestamp,
numOfReceivedMessages.get())
}
fun CoroutineScope.produceNumbers() {
repeat(NUM_OF_MESSAGES) { producerId ->
launch(Dispatchers.IO) {
Log.d("Producer", "producer $producerId started; ")
delay(DefaultConfiguration.DEFAULT_PRODUCER_DELAY_MS.toLong())
channel.send(producerId)
}
}
}
fun CoroutineScope.consumeNumbers() {
repeat(NUM_OF_MESSAGES) { consumerId ->
launch(Dispatchers.IO) {
Log.d("Consumer", "consumer $consumerId started; ")
channel.receive()
numOfReceivedMessages.incrementAndGet()
}
}
}
companion object {
private const val NUM_OF_MESSAGES = DefaultConfiguration.DEFAULT_NUM_OF_MESSAGES
private const val BLOCKING_QUEUE_CAPACITY = DefaultConfiguration.DEFAULT_BLOCKING_QUEUE_SIZE
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.