Created
November 8, 2019 15:48
-
-
Save smallufo/64e18dc1190a1eaeed01759789f04087 to your computer and use it in GitHub Desktop.
Hedging with blocking http client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package destiny | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.flow.* | |
import mu.KotlinLogging | |
import org.apache.http.client.fluent.Request | |
import java.net.URLEncoder | |
import kotlin.test.Test | |
interface UrlShorter { | |
suspend fun getShortUrl(longUrl: String): String? | |
} | |
val logger = KotlinLogging.logger {} | |
class IsgdImpl : UrlShorter { | |
override suspend fun getShortUrl(longUrl: String): String? { | |
logger.info("running : {}", Thread.currentThread().name) | |
val url = "https://is.gd/create.php?format=simple&url=%s".format(URLEncoder.encode(longUrl, "UTF-8")) | |
return withContext(Dispatchers.IO) { | |
logger.info("running Dispatchers.IO : {}", Thread.currentThread().name) | |
try { | |
Request.Get(url).execute().returnContent().asString().also { | |
logger.info("returning {}", it) | |
} | |
} catch (e: Throwable) { | |
null | |
} | |
} | |
} | |
} | |
class TinyImpl : UrlShorter { | |
override suspend fun getShortUrl(longUrl: String): String? { | |
logger.info("running : {}", Thread.currentThread().name) | |
val url = "http://tinyurl.com/api-create.php?url=$longUrl" | |
return withContext(Dispatchers.IO) { | |
logger.info("running Dispatchers.IO : {}", Thread.currentThread().name) | |
try { | |
Request.Get(url).execute().returnContent().asString().also { | |
logger.info("returning {}", it) | |
} | |
} catch (e: Throwable) { | |
null | |
} | |
} | |
} | |
} | |
/** | |
* delays 10 seconds and returns null | |
*/ | |
class DumbImpl : UrlShorter { | |
override suspend fun getShortUrl(longUrl: String): String? { | |
logger.info("running : {}", Thread.currentThread().name) | |
delay(10 * 1000) | |
return null | |
} | |
} | |
/** | |
* returns null immediately | |
*/ | |
class NullImpl : UrlShorter { | |
override suspend fun getShortUrl(longUrl: String): String? { | |
logger.info("running : {}", Thread.currentThread().name) | |
return null | |
} | |
} | |
@ExperimentalCoroutinesApi | |
@FlowPreview | |
class UrlShorterService(private val impls: List<UrlShorter>) { | |
suspend fun getShortUrl(longUrl: String): String { | |
return methodFlowMerge1(longUrl) | |
} | |
/** | |
* OK | |
* | |
* 13:40:21,358 INFO NullImpl - running : main @coroutine#3 | |
* 13:40:21,380 INFO DumbImpl - running : main @coroutine#4 | |
* 13:40:21,386 INFO IsgdImpl - running : main @coroutine#5 | |
* 13:40:21,402 INFO IsgdImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-1 @coroutine#5 | |
* 13:40:21,416 INFO TinyImpl - running : main @coroutine#6 | |
* 13:40:21,419 INFO TinyImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-2 @coroutine#6 | |
* 13:40:23,029 INFO TinyImpl$getShortUrl$2 - returning http://tinyurl.com/389lo | |
* 13:40:23,031 INFO IsgdImpl$getShortUrl$2 - returning https://is.gd/EuvYes | |
* 13:40:23,126 INFO UrlShorterServiceTest$testHedging$1 - result = http://tinyurl.com/389lo | |
*/ | |
private suspend fun methodFlowMerge1(longUrl: String): String { | |
return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl -> | |
flow { | |
impl.getShortUrl(longUrl)?.also { | |
emit(it) | |
} | |
} | |
}.first() | |
} | |
/** | |
* OK | |
* | |
* 23:20:04,913 INFO TinyImpl - running : DefaultDispatcher-worker-7 | |
* 23:20:04,916 INFO NullImpl - running : DefaultDispatcher-worker-1 | |
* 23:20:04,915 INFO DumbImpl - running : DefaultDispatcher-worker-3 | |
* 23:20:04,913 INFO IsgdImpl - running : DefaultDispatcher-worker-2 | |
* 23:20:04,921 INFO TinyImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-7 | |
* 23:20:04,922 INFO IsgdImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-2 | |
* 23:20:06,092 INFO TinyImpl$getShortUrl$2 - returning http://tinyurl.com/389lo | |
* 23:20:06,653 INFO IsgdImpl$getShortUrl$2 - returning https://is.gd/EuvYes | |
* 23:20:06,655 INFO UrlShorterServiceTest$testHedging$1 - result = http://tinyurl.com/389lo | |
*/ | |
private suspend fun methodFlowMerge2(longUrl: String): String { | |
return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl -> | |
flow { | |
impl.getShortUrl(longUrl)?.also { | |
emit(it) | |
} | |
}.flowOn(Dispatchers.IO) | |
}.first() | |
.also { Dispatchers.IO.cancelChildren() } | |
} | |
/** | |
* OK | |
* | |
* 13:27:18,505 INFO NullImpl - running : main @coroutine#2 | |
* 13:27:18,511 INFO DumbImpl - running : main @coroutine#3 | |
* 13:27:18,515 INFO IsgdImpl - running : main @coroutine#4 | |
* 13:27:18,523 INFO IsgdImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-1 @coroutine#4 | |
* 13:27:18,524 INFO TinyImpl - running : main @coroutine#5 | |
* 13:27:18,549 INFO TinyImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-2 @coroutine#5 | |
* 13:27:20,684 INFO IsgdImpl$getShortUrl$2 - returning https://is.gd/EuvYes | |
* 13:27:20,893 INFO TinyImpl$getShortUrl$2 - returning http://tinyurl.com/389lo | |
* 13:27:20,895 INFO UrlShorterServiceTest$testHedging$1 - result = https://is.gd/EuvYes | |
*/ | |
private suspend fun methodChannel(longUrl: String): String { | |
return coroutineScope { | |
val chan = Channel<String?>() | |
impls.forEach { impl -> | |
launch { | |
try { | |
impl.getShortUrl(longUrl).also { chan.send(it) } | |
} catch (e: Exception) { | |
chan.send(null) | |
} | |
} | |
} | |
(1..impls.size).forEach { _ -> | |
chan.receive()?.also { shortUrl -> | |
coroutineContext[Job]!!.cancelChildren() | |
return@coroutineScope shortUrl | |
} | |
} | |
throw Exception("All services failed") | |
} | |
} | |
} | |
@ExperimentalCoroutinesApi | |
@FlowPreview | |
class UrlShorterServiceTest { | |
@Test | |
fun testHedging() { | |
val impls = listOf(NullImpl(), DumbImpl(), IsgdImpl(), TinyImpl()) | |
val service = UrlShorterService(impls) | |
runBlocking { | |
service.getShortUrl("https://www.google.com").also { | |
logger.info("result = {}", it) | |
} | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment