package com.techyourchance.multithreading.demonstrations.purecoroutines | |
import android.util.Log | |
import com.techyourchance.multithreading.DefaultConfiguration | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.channels.Channel | |
import java.util.concurrent.atomic.AtomicInteger | |
class ProducerConsumerCoroutinesOnlyBenchmark { | |
class Result(val executionTime: Long, val numOfReceivedMessages: Int) | |
private val numOfReceivedMessages: AtomicInteger = AtomicInteger(0) | |
private val channel = Channel<Int>(BLOCKING_QUEUE_CAPACITY) | |
suspend fun startBenchmark(): Result = withContext(Dispatchers.IO) { | |
val startTimestamp = System.currentTimeMillis() | |
val deferredProducers = async { | |
produceNumbers() | |
} | |
val deferredConsumers = async { | |
consumeNumbers() | |
} | |
awaitAll(deferredConsumers, deferredProducers) | |
Result(System.currentTimeMillis() - startTimestamp, | |
numOfReceivedMessages.get()) | |
} | |
fun CoroutineScope.produceNumbers() { | |
repeat(NUM_OF_MESSAGES) { producerId -> | |
launch(Dispatchers.IO) { | |
Log.d("Producer", "producer $producerId started; ") | |
delay(DefaultConfiguration.DEFAULT_PRODUCER_DELAY_MS.toLong()) | |
channel.send(producerId) | |
} | |
} | |
} | |
fun CoroutineScope.consumeNumbers() { | |
repeat(NUM_OF_MESSAGES) { consumerId -> | |
launch(Dispatchers.IO) { | |
Log.d("Consumer", "consumer $consumerId started; ") | |
channel.receive() | |
numOfReceivedMessages.incrementAndGet() | |
} | |
} | |
} | |
companion object { | |
private const val NUM_OF_MESSAGES = DefaultConfiguration.DEFAULT_NUM_OF_MESSAGES | |
private const val BLOCKING_QUEUE_CAPACITY = DefaultConfiguration.DEFAULT_BLOCKING_QUEUE_SIZE | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment