Skip to content

Instantly share code, notes, and snippets.

@upeter
Last active February 1, 2024 20:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save upeter/05529866459c7d49eefab544ab30165b to your computer and use it in GitHub Desktop.
Save upeter/05529866459c7d49eefab544ab30165b to your computer and use it in GitHub Desktop.
Test demonstrating that Periodik blocks Dispatcher Thread
package dev.akif.periodik
import dev.akif.periodik
import kotlinx.coroutines.*
import org.junit.jupiter.api.Timeout
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import kotlin.random.Random
import kotlin.reflect.KProperty
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
class PeriodikBrokenTest {
@Test
@Timeout(5)
fun `should show that simple ReloadDelegate implementation does not block Dispatcher Thread `() = runBlocking {
val service = Service(singleThreadDispather)
val results = (1..3).map {
service.ratesWorking.also {
log.info("Rates: ${service.ratesWorking}")
delay(1000)
}
}
//expect rates to be updated every 500 millis, so fetching them 3 times every second, they should be changed 3 times
assertEquals(results.toSet().size, 3)
}
@Test
@Timeout(5)
fun `should show that periodiek blocks Dispatcher Thread permanently`() = runBlocking {
val service = Service(singleThreadDispather)
val results = (1..3).map {
service.ratesWorking.also {
log.info("Rates: ${service.ratesWithPeriodiek}")
delay(1000)
}
}
//expect rates to be updated every 500 millis, so fetching them 3 times every second, they should be changed 3 times
assertEquals(results.toSet().size, 3)
}
companion object {
private val log: Logger = LoggerFactory.getLogger(PeriodikBrokenTest::class.java)
val singleThreadDispather = newSingleThreadContext("single-thread")
}
}
/**
* Working multithreaded minimal implementation
*/
class ReloadDelegate<T>(
dispatcher: CoroutineDispatcher = Dispatchers.Default,
val delay: Duration,
val updateFun: suspend () -> T
) {
//for the initial, which must be fetched right away, Dispatchers.IO is a safe choice, could also be made configurable
private var backingField: Result<T> = runCatching { runBlocking(Dispatchers.IO) { updateFun() } }
private val schedulerScope = CoroutineScope(dispatcher)
init {
scheduleTasks()
}
private fun scheduleTasks() {
schedulerScope.launch {
delay(delay)
backingField = runCatching { updateFun()}
scheduleTasks()
}
}
operator fun getValue(thisRef: Any, property: KProperty<*>): T = backingField.getOrThrow()
}
/**
* Dummy implementation using Periodik and simple delegate
*/
class Service(coroutineDispatcher: CoroutineDispatcher) {
val rateApi = RateApi()
val ratesWorking: Map<String, Map<String, Double>> by ReloadDelegate(
coroutineDispatcher,
500.milliseconds,
::fetchParallel
)
val ratesWithPeriodiek: Map<String, Map<String, Double>> by periodik()
.on(Schedule.every(500.milliseconds))
.initializeLazily()
.loggingWithSlf4j()
.buildSuspending(coroutineDispatcher) {
fetchParallel()
}
suspend fun fetchParallel() = coroutineScope {
val ratesForAll =
supportedCurrencies.map { c ->
async {
val ratesForCurrency = rateApi.ratesFor(c).filterKeys { supportedCurrencies.contains(it) }
c to ratesForCurrency
}
}.awaitAll()
ratesForAll.associateBy(
keySelector = { (currency, _) -> currency },
valueTransform = { (_, ratesForCurrency) -> ratesForCurrency }
)
}
suspend fun get(from: String, to: String): ExchangeRate? =
when {
from == to -> ExchangeRate(from, to, 1.0)
else -> {
log.info("Getting exchange rate from $from to $to")
ratesWithPeriodiek[from]?.let { it[to] }?.let { ExchangeRate(from, to, it) }
}
}
companion object {
private val log: Logger = LoggerFactory.getLogger(Service::class.java)
val supportedCurrencies = setOf("USD", "EUR", "TRY", "GBP")
}
}
class RateApi {
companion object {
private val log: Logger = LoggerFactory.getLogger(RateApi::class.java)
private const val lower: Double = 0.2
private const val upper: Double = 50.0
}
suspend fun ratesFor(currency: String): Map<String, Double> {
log.info("Getting rates for {}", currency)
delay(300.milliseconds)
return mapOf(
"EUR" to Random.nextDouble(lower, upper),
"USD" to Random.nextDouble(lower, upper),
"TRY" to Random.nextDouble(lower, upper)
)
}
}
data class ExchangeRate(
val from: String,
val to: String,
val rate: Double
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment