Last active
December 15, 2017 10:02
-
-
Save billybong/d720fefe49a6db995eca525bd74cfbb8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.experimental.* | |
import kotlinx.coroutines.experimental.channels.Channel | |
import kotlinx.coroutines.experimental.channels.actor | |
class RequestCollapser<in ARGUMENT, BATCH_RESP, out RETURN_TYPE>( | |
private val threshold: Int, | |
private val fn: suspend (Collection<ARGUMENT>) -> BATCH_RESP, | |
private val remapper: (ARGUMENT, BATCH_RESP) -> RETURN_TYPE | |
) { | |
private val collapsingActor = actor<CollapserAgentMsg<ARGUMENT, RETURN_TYPE>>(CommonPool) { | |
val returnChannelsByArg: MutableMap<ARGUMENT, MutableList<Channel<RETURN_TYPE>>> = HashMap() | |
for (msg in channel) { | |
when (msg) { | |
is CollapserAgentMsg.CollapserTrigger<ARGUMENT, RETURN_TYPE> -> { | |
callActionFunction(returnChannelsByArg) | |
returnChannelsByArg.clear() | |
} | |
is CollapserAgentMsg.CollapserRequest<ARGUMENT, RETURN_TYPE> -> { | |
returnChannelsByArg.compute(msg.arg) { _, v -> | |
val channelList = v ?: mutableListOf() | |
channelList.add(msg.returnChannel) | |
channelList | |
} | |
val nrOfRequests = returnChannelsByArg.values.flatMap { it }.size | |
if (nrOfRequests >= threshold) { | |
callActionFunction(returnChannelsByArg) | |
returnChannelsByArg.clear() | |
//channel.send(CollapserAgentMsg.CollapserTrigger<ARGUMENT, RETURN_TYPE>()) | |
} | |
} | |
} | |
} | |
} | |
private suspend fun callActionFunction(returnChannelsByArg: MutableMap<ARGUMENT, MutableList<Channel<RETURN_TYPE>>>) { | |
val batchResponse = collapse(returnChannelsByArg.keys.toSet()) | |
returnChannelsByArg.forEach { arg, responseChannels -> | |
responseChannels.forEach { responseChannel -> | |
launch(CommonPool) { | |
responseChannel.send(remapper.invoke(arg, batchResponse)) | |
} | |
} | |
} | |
} | |
suspend fun request(param: ARGUMENT): Deferred<RETURN_TYPE> { | |
println("Received " + param) | |
val responseChannel = Channel.Factory.invoke<RETURN_TYPE>(1) | |
collapsingActor.send(CollapserAgentMsg.CollapserRequest(param, responseChannel)) | |
return async(CommonPool) { responseChannel.receive() } | |
} | |
private suspend fun collapse(params: Set<ARGUMENT>): BATCH_RESP = fn.invoke(params) | |
} | |
sealed class CollapserAgentMsg<ARGUMENT, RETURN_TYPE> { | |
class CollapserRequest<ARG, RETURN_TYPE>(val arg: ARG, val returnChannel: Channel<RETURN_TYPE>) : CollapserAgentMsg<ARG, RETURN_TYPE>() | |
class CollapserTrigger<ARGUMENT, RETURN_TYPE> : CollapserAgentMsg<ARGUMENT, RETURN_TYPE>() | |
} | |
class Tester { | |
companion object { | |
@JvmStatic | |
fun main(args: Array<String>) { | |
println("Setting up collapser") | |
val action: suspend (Collection<Int>) -> List<Int> = { i -> | |
println("Executing action - hopefully just once") | |
i.toList() | |
} | |
val collapser = RequestCollapser<Int, List<Int>, String>(3, action, { _, i -> i.toString() }) | |
runBlocking { | |
val a = collapser.request(1) | |
val b = collapser.request(2) | |
val c = collapser.request(3) | |
println(a.await()) | |
println(b.await()) | |
println(c.await()) | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment