Last active
November 7, 2019 18:27
-
-
Save smallufo/8bcaa1082f0b4b88e045237f494e975e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Created by smallufo on 2019-11-06. | |
*/ | |
package destiny | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.flow.* | |
import mu.KotlinLogging | |
import org.apache.http.client.fluent.Request | |
import java.net.URLEncoder | |
import java.util.concurrent.ExecutorService | |
import java.util.concurrent.Executors | |
import java.util.concurrent.TimeUnit | |
import kotlin.test.Test | |
interface UrlShorter { | |
fun getShortUrl(longUrl: String): String? | |
} | |
val logger = KotlinLogging.logger {} | |
class IsgdImpl : UrlShorter { | |
override fun getShortUrl(longUrl: String): String? { | |
logger.info("running : {}", Thread.currentThread().name) | |
val url = "https://is.gd/create.php?format=simple&url=%s".format(URLEncoder.encode(longUrl, "UTF-8")) | |
return Request.Get(url).execute().returnContent().asString().also { | |
logger.info("returning {}", it) | |
} | |
} | |
} | |
class TinyImpl : UrlShorter { | |
override fun getShortUrl(longUrl: String): String? { | |
logger.info("running : {}", Thread.currentThread().name) | |
val url = "http://tinyurl.com/api-create.php?url=$longUrl" | |
return Request.Get(url).execute().returnContent().asString().also { | |
logger.info("returning {}", it) | |
} | |
} | |
} | |
class DumbImpl : UrlShorter { | |
override fun getShortUrl(longUrl: String): String? { | |
logger.info("running : {}", Thread.currentThread().name) | |
TimeUnit.SECONDS.sleep(10) | |
return null | |
} | |
} | |
@ExperimentalCoroutinesApi | |
@FlowPreview | |
class UrlShorterService(private val impls: List<UrlShorter>) { | |
private val es: ExecutorService = Executors.newFixedThreadPool(impls.size) | |
private val esDispatcher = es.asCoroutineDispatcher() | |
suspend fun getShortUrl(longUrl: String): String { | |
return method4(longUrl) | |
} | |
private inline fun <T, R : Any> Iterable<T>.firstNotNullResult(transform: (T) -> R?): R? { | |
for (element in this) { | |
val result = transform(element) | |
if (result != null) return result | |
} | |
return null | |
} | |
/** | |
* 00:56:09,253 INFO TinyImpl - running : pool-1-thread-3 | |
* 00:56:09,254 INFO DumbImpl - running : pool-1-thread-1 | |
* 00:56:09,253 INFO IsgdImpl - running : pool-1-thread-2 | |
* 00:56:11,150 INFO TinyImpl - returning http://tinyurl.com/389lo | |
* 00:56:13,604 INFO IsgdImpl - returning https://is.gd/EuvYes | |
* 00:56:19,261 INFO UrlShorterServiceTest$testHedging$1 - result = http://tinyurl.com/389lo | |
*/ | |
private suspend fun method1(longUrl: String): String { | |
return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl -> | |
flow { | |
impl.getShortUrl(longUrl)?.also { | |
emit(it) | |
} | |
}.flowOn(esDispatcher) | |
}.first() | |
.also { esDispatcher.cancelChildren() } // doesn't impact the result | |
} | |
/** | |
* 00:54:29,035 INFO IsgdImpl - running : pool-1-thread-3 | |
* 00:54:29,036 INFO DumbImpl - running : pool-1-thread-2 | |
* 00:54:29,035 INFO TinyImpl - running : pool-1-thread-1 | |
* 00:54:30,228 INFO TinyImpl - returning http://tinyurl.com/389lo | |
* 00:54:30,797 INFO IsgdImpl - returning https://is.gd/EuvYes | |
* 00:54:39,046 INFO UrlShorterServiceTest$testHedging$1 - result = https://is.gd/EuvYes | |
*/ | |
private suspend fun method2(longUrl: String): String { | |
return withContext(esDispatcher) { | |
impls.map { impl -> | |
async(esDispatcher) { | |
impl.getShortUrl(longUrl) | |
} | |
}.firstNotNullResult { it.await() } ?: longUrl | |
} | |
} | |
/** | |
* 00:52:30,681 INFO IsgdImpl - running : pool-1-thread-2 | |
* 00:52:30,682 INFO DumbImpl - running : pool-1-thread-1 | |
* 00:52:30,681 INFO TinyImpl - running : pool-1-thread-3 | |
* 00:52:31,838 INFO TinyImpl - returning http://tinyurl.com/389lo | |
* 00:52:33,721 INFO IsgdImpl - returning https://is.gd/EuvYes | |
* 00:52:40,691 INFO UrlShorterServiceTest$testHedging$1 - result = https://is.gd/EuvYes | |
*/ | |
private suspend fun method3(longUrl: String): String { | |
return coroutineScope { | |
impls.map { impl -> | |
async(esDispatcher) { | |
impl.getShortUrl(longUrl) | |
} | |
}.firstNotNullResult { it.await() } ?: longUrl | |
} | |
} | |
/** | |
* 01:58:56,930 INFO TinyImpl - running : pool-1-thread-1 | |
* 01:58:56,933 INFO DumbImpl - running : pool-1-thread-2 | |
* 01:58:56,930 INFO IsgdImpl - running : pool-1-thread-3 | |
* 01:58:58,411 INFO TinyImpl - returning http://tinyurl.com/389lo | |
* 01:58:59,026 INFO IsgdImpl - returning https://is.gd/EuvYes | |
* 01:59:06,942 INFO UrlShorterServiceTest$testHedging$1 - result = https://is.gd/EuvYes | |
*/ | |
private suspend fun method4(longUrl: String): String { | |
return withContext(esDispatcher) { | |
impls.map { impl -> | |
async { | |
impl.getShortUrl(longUrl) | |
} | |
}.firstNotNullResult { it.await() } ?: longUrl | |
} | |
} | |
/** | |
* 01:29:44,460 INFO UrlShorterService$method5$2 - channel closed | |
* 01:29:44,461 INFO DumbImpl - running : pool-1-thread-2 | |
* 01:29:44,460 INFO IsgdImpl - running : pool-1-thread-3 | |
* 01:29:44,466 INFO TinyImpl - running : pool-1-thread-1 | |
* 01:29:45,765 INFO TinyImpl - returning http://tinyurl.com/389lo | |
* 01:29:46,339 INFO IsgdImpl - returning https://is.gd/EuvYes | |
* | |
* kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed | |
* | |
*/ | |
private suspend fun method5(longUrl: String): String { | |
val channel = Channel<String>() | |
withContext(esDispatcher) { | |
impls.forEach { impl -> | |
launch { | |
impl.getShortUrl(longUrl)?.also { | |
channel.send(it) | |
} | |
} | |
} | |
channel.close() | |
logger.info("channel closed") | |
} | |
return channel.consumeAsFlow().first() | |
} | |
/** | |
* The underlaying http request library is blocking | |
* 02:08:35,015 INFO DumbImpl - running : main | |
* 02:08:45,042 INFO IsgdImpl - running : main | |
* 02:08:47,758 INFO IsgdImpl - returning https://is.gd/EuvYes | |
* 02:08:47,762 INFO TinyImpl - running : main | |
* 02:08:48,240 INFO TinyImpl - returning http://tinyurl.com/389lo | |
* 02:08:48,255 INFO UrlShorterServiceTest$testHedging$1 - result = https://is.gd/EuvYes | |
*/ | |
private suspend fun method6(longUrl: String): String { | |
return coroutineScope { | |
val chan = Channel<String?>() | |
impls.forEach { impl -> | |
launch { | |
try { | |
impl.getShortUrl(longUrl).also { chan.send(it) } | |
} catch (e: Exception) { | |
chan.send(null) | |
} | |
} | |
} | |
(1..impls.size).forEach { _ -> | |
chan.receive()?.also { shortUrl -> | |
coroutineContext[Job]!!.cancelChildren() | |
return@coroutineScope shortUrl | |
} | |
} | |
throw Exception("All services failed") | |
} | |
} | |
} | |
/** | |
* spring flux | |
* https://github.com/spring-tips/hedging/blob/master/client/src/main/java/com/example/client/ClientApplication.java | |
*/ | |
@ExperimentalCoroutinesApi | |
@FlowPreview | |
class UrlShorterServiceTest { | |
@Test | |
fun testHedging() { | |
val impls = listOf(DumbImpl(), IsgdImpl(), TinyImpl()) | |
val service = UrlShorterService(impls) | |
runBlocking { | |
service.getShortUrl("https://www.google.com").also { | |
logger.info("result = {}", it) | |
} | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment