Skip to content

Instantly share code, notes, and snippets.

@Abegemot
Created April 8, 2021 08:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Abegemot/157a548ac4c1725c66fc45e584a66cdd to your computer and use it in GitHub Desktop.
Save Abegemot/157a548ac4c1725c66fc45e584a66cdd to your computer and use it in GitHub Desktop.
filtering duplicate calls
/* sample function to be called from http, a ktor route
* fkMutex checks whether the incoming key (in that case a number, but can be whatever )
* is being executed, if so it delays its execution till the first one ends
* this feature make sense in many situations where the response is usually a cached value that can be fast delivered
* whereas, when not present the calculation of that response can be very costly, furthermore if you allow several users
* to perform the calculation at the same time
*/
suspend fun Test(s:String): String {
loggerk.debug { "before inc Test($s) $calln" }
calln.incrementAndGet()
loggerk.debug { "after inc Test($s) $calln" }
val r= fkMutex(s){
val i=s.toInt()
if(i==0) delay(5000)
if(i==1) delay(5)
if(i==2) delay(5)
if(i==3) delay(5)
if(i==4) delay(50)
"fake result"
}
return r
}
/* fkMutex is damn simple
* It relays on a KWaiter to run or suspend the execution of the request
* if the key is already present the function will suspend, but this implies that the same key is already being executed
* when that execution finishes KWaiter will notify all pending executions and run them and of course remove the key
*/
suspend fun <T> fkMutex(key: String, f: suspend () -> T): T {
kWaiter.doWait(key, Waiter())
return try {
f()
} catch (e: Exception) {
throw e
} finally {
kWaiter.releaseKey(key)
}
}
/* KWaiter contains a HashMap of keys and Waiters, each key has list of waiters corresponding to different calls with the same key
* waiters in turn are rendezvous channels for suspending the execution of the caller
* so when a key is found it's associated waiter is put in 'receive' mode and it will be awakened when the running process finishes
* and all the waiters corresponding to that key will be 'offered' to allow execution, this time in parallel
*/
class KWaiter() {
val hm:HashMap<String,MutableList<Waiter>> = HashMap()
init{
loggerk.debug { "Init KWaiter $hm " }
}
suspend fun doWait(key:String,w: Waiter) {
if(isKeyPresent(key,w,hm)){
loggerk.debug { "wait $key $hm ($calln) request number" }
w.doWait()
}else {
loggerk.debug { "run $key $hm ($calln) request number" }
}
}
suspend fun releaseKey(key: String){
removeKey(key,hm)
}
}
class Waiter(private val channel: Channel<Unit> = Channel<Unit>(0)) {
suspend fun doWait() { channel.receive() }
fun doNotify() { channel.offer(Unit) }
}
suspend fun removeKey(key:String,hm:HashMap<String,MutableList<Waiter>>){
Mutex().withLock {
if (hm[key] != null) {
loggerk.debug { "$key notify all $hm" }
hm[key]?.forEach { it -> it.doNotify() }
hm.remove(key)
} else {
loggerk.debug { "key $key NULL" }
}
}
}
suspend fun isKeyPresent(key:String,w:Waiter,hm:HashMap<String,MutableList<Waiter>>):Boolean{
Mutex().withLock {
return if (hm[key] == null) {
hm[key] = mutableListOf(w)
false
} else {
hm[key]?.add(w)
true
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment