Created
November 9, 2019 05:01
-
-
Save smallufo/04bffb65535e84ea61c494b2a0dfea84 to your computer and use it in GitHub Desktop.
Hedging By OkHttp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package destiny | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.flow.* | |
import mu.KotlinLogging | |
import okhttp3.OkHttpClient | |
import okhttp3.Request | |
/** | |
* ru.gildor.coroutines:kotlin-coroutines-okhttp:1.0 | |
*/ | |
import ru.gildor.coroutines.okhttp.await | |
import java.net.URLEncoder | |
import kotlin.test.Test | |
interface UrlShorter { | |
suspend fun getShortUrl(longUrl: String): String? | |
} | |
val logger = KotlinLogging.logger {} | |
/** thread safe according to OkHttp document */ | |
val client: OkHttpClient = OkHttpClient.Builder().build() | |
/** | |
Call.await() extension source : | |
https://github.com/gildor/kotlin-coroutines-okhttp/blob/master/src/main/kotlin/ru/gildor/coroutines/okhttp/CallAwait.kt | |
*/ | |
class IsgdImpl : UrlShorter { | |
override suspend fun getShortUrl(longUrl: String): String? { | |
logger.info("running : {}", Thread.currentThread().name) | |
val url = "https://is.gd/create.php?format=simple&url=%s".format(URLEncoder.encode(longUrl, "UTF-8")) | |
val req = Request.Builder().url(url).build() | |
return client.newCall(req).await().use { res -> | |
try { | |
res.body?.string() | |
} catch (e : Throwable) { | |
null | |
} | |
} | |
} | |
} | |
class TinyImpl : UrlShorter { | |
override suspend fun getShortUrl(longUrl: String): String? { | |
logger.info("running : {}", Thread.currentThread().name) | |
val url = "http://tinyurl.com/api-create.php?url=$longUrl" | |
val req = Request.Builder().url(url).build() | |
return client.newCall(req).await().use { res -> | |
try { | |
res.body?.string() | |
} catch (e : Throwable) { | |
null | |
} | |
} | |
} | |
} | |
/** | |
* delays 10 seconds and returns null | |
*/ | |
class DumbImpl : UrlShorter { | |
override suspend fun getShortUrl(longUrl: String): String? { | |
logger.info("running : {}", Thread.currentThread().name) | |
delay(10 * 1000) | |
return null | |
} | |
} | |
/** | |
* returns null immediately | |
*/ | |
class NullImpl : UrlShorter { | |
override suspend fun getShortUrl(longUrl: String): String? { | |
logger.info("running : {}", Thread.currentThread().name) | |
return null | |
} | |
} | |
@ExperimentalCoroutinesApi | |
@FlowPreview | |
class UrlShorterService(private val impls: List<UrlShorter>) { | |
suspend fun getShortUrl(longUrl: String): String { | |
return methodFlowMerge1(longUrl) | |
} | |
/** | |
* OK | |
* | |
* 12:55:32,617 INFO destiny.NullImpl - running : main | |
* 12:55:32,622 INFO destiny.DumbImpl - running : main | |
* 12:55:32,630 INFO destiny.IsgdImpl - running : main | |
* 12:55:32,711 INFO destiny.TinyImpl - running : main | |
* 12:55:33,150 INFO destiny.UrlShorterServiceTest$testHedging$1 - result = http://tinyurl.com/389lo | |
* | |
*/ | |
private suspend fun methodFlowMerge1(longUrl: String): String { | |
return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl -> | |
flow { | |
impl.getShortUrl(longUrl)?.also { | |
emit(it) | |
} | |
} | |
}.first() | |
} | |
/** | |
* OK | |
* | |
* 12:53:36,227 INFO TinyImpl - running : DefaultDispatcher-worker-5 | |
* 12:53:36,229 INFO NullImpl - running : DefaultDispatcher-worker-1 | |
* 12:53:36,229 INFO DumbImpl - running : DefaultDispatcher-worker-2 | |
* 12:53:36,227 INFO IsgdImpl - running : DefaultDispatcher-worker-4 | |
* 12:53:37,135 INFO UrlShorterServiceTest$testHedging$1 - result = http://tinyurl.com/389lo | |
*/ | |
private suspend fun methodFlowMerge2(longUrl: String): String { | |
return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl -> | |
flow { | |
impl.getShortUrl(longUrl)?.also { | |
emit(it) | |
} | |
}.flowOn(Dispatchers.IO) | |
}.first() | |
.also { Dispatchers.IO.cancelChildren() } | |
} | |
/** | |
* OK | |
* | |
* 12:57:22,663 INFO destiny.NullImpl - running : main | |
* 12:57:22,685 INFO destiny.DumbImpl - running : main | |
* 12:57:22,696 INFO destiny.IsgdImpl - running : main | |
* 12:57:22,846 INFO destiny.TinyImpl - running : main | |
* 12:57:23,296 INFO destiny.UrlShorterServiceTest$testHedging$1 - result = http://tinyurl.com/389lo | |
*/ | |
private suspend fun methodChannel(longUrl: String): String { | |
return coroutineScope { | |
val chan = Channel<String?>() | |
impls.forEach { impl -> | |
launch { | |
try { | |
impl.getShortUrl(longUrl).also { chan.send(it) } | |
} catch (e: Exception) { | |
chan.send(null) | |
} | |
} | |
} | |
(1..impls.size).forEach { _ -> | |
chan.receive()?.also { shortUrl -> | |
coroutineContext[Job]!!.cancelChildren() | |
return@coroutineScope shortUrl | |
} | |
} | |
throw Exception("All services failed") | |
} | |
} | |
} | |
@ExperimentalCoroutinesApi | |
@FlowPreview | |
class UrlShorterServiceTest { | |
@Test | |
fun testHedging() { | |
val impls = listOf(NullImpl(), DumbImpl(), IsgdImpl(), TinyImpl()) | |
val service = UrlShorterService(impls) | |
runBlocking { | |
service.getShortUrl("https://www.google.com").also { | |
logger.info("result = {}", it) | |
} | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment