Skip to content

Instantly share code, notes, and snippets.

@AllanWang
Last active December 26, 2018 04:49
Show Gist options
  • Save AllanWang/cbc47ca7f3f490f63e05f7170ca545c0 to your computer and use it in GitHub Desktop.
Save AllanWang/cbc47ca7f3f490f63e05f7170ca545c0 to your computer and use it in GitHub Desktop.
Kotlin Flyweight
/**
* Flyweight to keep track of values so long as they are valid.
* Values that have been fetched within [maxAge] from the time of use will be reused.
* If multiple requests are sent with the same key, then the value should only be fetched once.
* Otherwise, they will be fetched using [fetcher].
* All requests will stem from the supplied [scope].
*/
class Flyweight<K, V>(
val scope: CoroutineScope,
val maxAge: Long,
val fetcher: suspend (K) -> V
) {
// Receives a key and a pending request
private val actionChannel = Channel<Pair<K, Continuation<V>>>()
// Receives a key to invalidate the associated value
private val invalidatorChannel = Channel<K>()
// Receives a key to fetch the value
private val requesterChannel = Channel<K>()
// Receives a key and the resulting value
private val receiverChannel = Channel<Pair<K, Result<V>>>()
// Keeps track of keys and associated update times
private val conditionMap = mutableMapOf<K, Long>()
// Keeps track of keys and associated values
private val resultMap = mutableMapOf<K, Result<V>>()
// Keeps track of unfulfilled actions
private val pendingMap = mutableMapOf<K, MutableList<Continuation<V>>>().withDefault { mutableListOf() }
init {
scope.launch {
select<Unit> {
/*
* New request received. Continuation should be fulfilled eventually
*/
actionChannel.onReceive { (key, continuation) ->
val lastUpdate = conditionMap[key]
val lastResult = resultMap[key]
// Valid value, retrieved within acceptable time
if (lastResult != null && lastUpdate != null && System.currentTimeMillis() - lastUpdate < maxAge) {
continuation.resumeWith(lastResult)
} else {
val valueRequestPending = key in pendingMap
pendingMap.getValue(key).add(continuation)
if (!valueRequestPending)
requesterChannel.send(key)
}
}
/*
* Invalidator received. Existing result associated with key should not be used.
* Note that any unfulfilled request and future requests should still operate, but with a new value.
*/
invalidatorChannel.onReceive { key ->
if (key !in resultMap) {
// Nothing to invalidate.
// If pending requests exist, they are already in the process of being updated.
return@onReceive
}
conditionMap.remove(key)
resultMap.remove(key)
if (pendingMap[key]?.isNotEmpty() == true)
// Refetch value for pending requests
requesterChannel.send(key)
}
/*
* Value request received. Should fetch new value using supplied fetcher
*/
requesterChannel.onReceive { key ->
val result = runCatching {
fetcher(key)
}
receiverChannel.send(key to result)
}
/*
* Value request fulfilled. Should now fulfill pending requests
*/
receiverChannel.onReceive { (key, result) ->
conditionMap[key] = System.currentTimeMillis()
resultMap[key] = result
pendingMap.remove(key)?.forEach { it.resumeWith(result) }
}
}
}
}
suspend fun fetch(key: K): V = suspendCoroutine {
scope.launch {
actionChannel.send(key to it)
}
}
suspend fun invalidate(key: K) {
invalidatorChannel.send(key)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment