Last active
December 26, 2018 04:49
-
-
Save AllanWang/cbc47ca7f3f490f63e05f7170ca545c0 to your computer and use it in GitHub Desktop.
Kotlin Flyweight
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Flyweight to keep track of values so long as they are valid. | |
* Values that have been fetched within [maxAge] from the time of use will be reused. | |
* If multiple requests are sent with the same key, then the value should only be fetched once. | |
* Otherwise, they will be fetched using [fetcher]. | |
* All requests will stem from the supplied [scope]. | |
*/ | |
class Flyweight<K, V>( | |
val scope: CoroutineScope, | |
val maxAge: Long, | |
val fetcher: suspend (K) -> V | |
) { | |
// Receives a key and a pending request | |
private val actionChannel = Channel<Pair<K, Continuation<V>>>() | |
// Receives a key to invalidate the associated value | |
private val invalidatorChannel = Channel<K>() | |
// Receives a key to fetch the value | |
private val requesterChannel = Channel<K>() | |
// Receives a key and the resulting value | |
private val receiverChannel = Channel<Pair<K, Result<V>>>() | |
// Keeps track of keys and associated update times | |
private val conditionMap = mutableMapOf<K, Long>() | |
// Keeps track of keys and associated values | |
private val resultMap = mutableMapOf<K, Result<V>>() | |
// Keeps track of unfulfilled actions | |
private val pendingMap = mutableMapOf<K, MutableList<Continuation<V>>>().withDefault { mutableListOf() } | |
init { | |
scope.launch { | |
select<Unit> { | |
/* | |
* New request received. Continuation should be fulfilled eventually | |
*/ | |
actionChannel.onReceive { (key, continuation) -> | |
val lastUpdate = conditionMap[key] | |
val lastResult = resultMap[key] | |
// Valid value, retrieved within acceptable time | |
if (lastResult != null && lastUpdate != null && System.currentTimeMillis() - lastUpdate < maxAge) { | |
continuation.resumeWith(lastResult) | |
} else { | |
val valueRequestPending = key in pendingMap | |
pendingMap.getValue(key).add(continuation) | |
if (!valueRequestPending) | |
requesterChannel.send(key) | |
} | |
} | |
/* | |
* Invalidator received. Existing result associated with key should not be used. | |
* Note that any unfulfilled request and future requests should still operate, but with a new value. | |
*/ | |
invalidatorChannel.onReceive { key -> | |
if (key !in resultMap) { | |
// Nothing to invalidate. | |
// If pending requests exist, they are already in the process of being updated. | |
return@onReceive | |
} | |
conditionMap.remove(key) | |
resultMap.remove(key) | |
if (pendingMap[key]?.isNotEmpty() == true) | |
// Refetch value for pending requests | |
requesterChannel.send(key) | |
} | |
/* | |
* Value request received. Should fetch new value using supplied fetcher | |
*/ | |
requesterChannel.onReceive { key -> | |
val result = runCatching { | |
fetcher(key) | |
} | |
receiverChannel.send(key to result) | |
} | |
/* | |
* Value request fulfilled. Should now fulfill pending requests | |
*/ | |
receiverChannel.onReceive { (key, result) -> | |
conditionMap[key] = System.currentTimeMillis() | |
resultMap[key] = result | |
pendingMap.remove(key)?.forEach { it.resumeWith(result) } | |
} | |
} | |
} | |
} | |
suspend fun fetch(key: K): V = suspendCoroutine { | |
scope.launch { | |
actionChannel.send(key to it) | |
} | |
} | |
suspend fun invalidate(key: K) { | |
invalidatorChannel.send(key) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment