-
-
Save Abegemot/157a548ac4c1725c66fc45e584a66cdd to your computer and use it in GitHub Desktop.
filtering duplicate calls
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* sample function to be called from http, a ktor route | |
* fkMutex checks whether the incoming key (in that case a number, but can be whatever ) | |
* is being executed, if so it delays its execution till the first one ends | |
* this feature make sense in many situations where the response is usually a cached value that can be fast delivered | |
* whereas, when not present the calculation of that response can be very costly, furthermore if you allow several users | |
* to perform the calculation at the same time | |
*/ | |
suspend fun Test(s:String): String { | |
loggerk.debug { "before inc Test($s) $calln" } | |
calln.incrementAndGet() | |
loggerk.debug { "after inc Test($s) $calln" } | |
val r= fkMutex(s){ | |
val i=s.toInt() | |
if(i==0) delay(5000) | |
if(i==1) delay(5) | |
if(i==2) delay(5) | |
if(i==3) delay(5) | |
if(i==4) delay(50) | |
"fake result" | |
} | |
return r | |
} | |
/* fkMutex is damn simple | |
* It relays on a KWaiter to run or suspend the execution of the request | |
* if the key is already present the function will suspend, but this implies that the same key is already being executed | |
* when that execution finishes KWaiter will notify all pending executions and run them and of course remove the key | |
*/ | |
suspend fun <T> fkMutex(key: String, f: suspend () -> T): T { | |
kWaiter.doWait(key, Waiter()) | |
return try { | |
f() | |
} catch (e: Exception) { | |
throw e | |
} finally { | |
kWaiter.releaseKey(key) | |
} | |
} | |
/* KWaiter contains a HashMap of keys and Waiters, each key has list of waiters corresponding to different calls with the same key | |
* waiters in turn are rendezvous channels for suspending the execution of the caller | |
* so when a key is found it's associated waiter is put in 'receive' mode and it will be awakened when the running process finishes | |
* and all the waiters corresponding to that key will be 'offered' to allow execution, this time in parallel | |
*/ | |
class KWaiter() { | |
val hm:HashMap<String,MutableList<Waiter>> = HashMap() | |
init{ | |
loggerk.debug { "Init KWaiter $hm " } | |
} | |
suspend fun doWait(key:String,w: Waiter) { | |
if(isKeyPresent(key,w,hm)){ | |
loggerk.debug { "wait $key $hm ($calln) request number" } | |
w.doWait() | |
}else { | |
loggerk.debug { "run $key $hm ($calln) request number" } | |
} | |
} | |
suspend fun releaseKey(key: String){ | |
removeKey(key,hm) | |
} | |
} | |
class Waiter(private val channel: Channel<Unit> = Channel<Unit>(0)) { | |
suspend fun doWait() { channel.receive() } | |
fun doNotify() { channel.offer(Unit) } | |
} | |
suspend fun removeKey(key:String,hm:HashMap<String,MutableList<Waiter>>){ | |
Mutex().withLock { | |
if (hm[key] != null) { | |
loggerk.debug { "$key notify all $hm" } | |
hm[key]?.forEach { it -> it.doNotify() } | |
hm.remove(key) | |
} else { | |
loggerk.debug { "key $key NULL" } | |
} | |
} | |
} | |
suspend fun isKeyPresent(key:String,w:Waiter,hm:HashMap<String,MutableList<Waiter>>):Boolean{ | |
Mutex().withLock { | |
return if (hm[key] == null) { | |
hm[key] = mutableListOf(w) | |
false | |
} else { | |
hm[key]?.add(w) | |
true | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment