Skip to content

Instantly share code, notes, and snippets.

@billybong
Last active December 15, 2017 10:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save billybong/d720fefe49a6db995eca525bd74cfbb8 to your computer and use it in GitHub Desktop.
Save billybong/d720fefe49a6db995eca525bd74cfbb8 to your computer and use it in GitHub Desktop.
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.channels.actor
class RequestCollapser<in ARGUMENT, BATCH_RESP, out RETURN_TYPE>(
private val threshold: Int,
private val fn: suspend (Collection<ARGUMENT>) -> BATCH_RESP,
private val remapper: (ARGUMENT, BATCH_RESP) -> RETURN_TYPE
) {
private val collapsingActor = actor<CollapserAgentMsg<ARGUMENT, RETURN_TYPE>>(CommonPool) {
val returnChannelsByArg: MutableMap<ARGUMENT, MutableList<Channel<RETURN_TYPE>>> = HashMap()
for (msg in channel) {
when (msg) {
is CollapserAgentMsg.CollapserTrigger<ARGUMENT, RETURN_TYPE> -> {
callActionFunction(returnChannelsByArg)
returnChannelsByArg.clear()
}
is CollapserAgentMsg.CollapserRequest<ARGUMENT, RETURN_TYPE> -> {
returnChannelsByArg.compute(msg.arg) { _, v ->
val channelList = v ?: mutableListOf()
channelList.add(msg.returnChannel)
channelList
}
val nrOfRequests = returnChannelsByArg.values.flatMap { it }.size
if (nrOfRequests >= threshold) {
callActionFunction(returnChannelsByArg)
returnChannelsByArg.clear()
//channel.send(CollapserAgentMsg.CollapserTrigger<ARGUMENT, RETURN_TYPE>())
}
}
}
}
}
private suspend fun callActionFunction(returnChannelsByArg: MutableMap<ARGUMENT, MutableList<Channel<RETURN_TYPE>>>) {
val batchResponse = collapse(returnChannelsByArg.keys.toSet())
returnChannelsByArg.forEach { arg, responseChannels ->
responseChannels.forEach { responseChannel ->
launch(CommonPool) {
responseChannel.send(remapper.invoke(arg, batchResponse))
}
}
}
}
suspend fun request(param: ARGUMENT): Deferred<RETURN_TYPE> {
println("Received " + param)
val responseChannel = Channel.Factory.invoke<RETURN_TYPE>(1)
collapsingActor.send(CollapserAgentMsg.CollapserRequest(param, responseChannel))
return async(CommonPool) { responseChannel.receive() }
}
private suspend fun collapse(params: Set<ARGUMENT>): BATCH_RESP = fn.invoke(params)
}
sealed class CollapserAgentMsg<ARGUMENT, RETURN_TYPE> {
class CollapserRequest<ARG, RETURN_TYPE>(val arg: ARG, val returnChannel: Channel<RETURN_TYPE>) : CollapserAgentMsg<ARG, RETURN_TYPE>()
class CollapserTrigger<ARGUMENT, RETURN_TYPE> : CollapserAgentMsg<ARGUMENT, RETURN_TYPE>()
}
class Tester {
companion object {
@JvmStatic
fun main(args: Array<String>) {
println("Setting up collapser")
val action: suspend (Collection<Int>) -> List<Int> = { i ->
println("Executing action - hopefully just once")
i.toList()
}
val collapser = RequestCollapser<Int, List<Int>, String>(3, action, { _, i -> i.toString() })
runBlocking {
val a = collapser.request(1)
val b = collapser.request(2)
val c = collapser.request(3)
println(a.await())
println(b.await())
println(c.await())
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment