Last active
January 31, 2019 19:46
-
-
Save gubser/0888d947a701ad11b4b8fdccacdf1ed3 to your computer and use it in GitHub Desktop.
A bounded cache for async operations in F#
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module AsyncBoundedCache | |
open System | |
open System.Collections.Concurrent | |
open System.Threading | |
open System.Threading.Tasks | |
let startTime = DateTime.UtcNow.Ticks / 10000000L | |
let now () = int ((DateTime.UtcNow.Ticks / 10000000L) - startTime) | |
type CachedValue<'value> = { | |
Value: 'value | |
RetrievedAt: int | |
mutable LastAccessedAt: int | |
} | |
type CacheEntry<'value> = | |
| CachedValue of CachedValue<'value> | |
| PendingRequest of Task<'value> | |
type AsyncBoundedCache<'key,'value> (maxCount: int, removeBulkSize: int, maxRetrievalAge: int, | |
request: 'key -> Async<'value>) = | |
let cache = ConcurrentDictionary<'key, CacheEntry<'value>>() | |
// Lock to limit to only one write operation at a time. | |
// This way we can make sure that when two threads request the same | |
// (uncached) value at the same time, it will not start two | |
// independent, and potentially costly, request operations. | |
let writeLock = new SemaphoreSlim(1, 1) | |
let isValid cachedValue = | |
(now() - cachedValue.RetrievedAt) < maxRetrievalAge | |
let isValidOrPending entry = | |
match entry with | |
| CachedValue cachedValue -> isValid cachedValue | |
| PendingRequest _ -> true | |
// Get the value from a CacheEntry and update its access time | |
let accessValue entry = async { | |
match entry with | |
| CachedValue cachedValue -> | |
cachedValue.LastAccessedAt <- now() | |
return cachedValue.Value | |
| PendingRequest task -> | |
return! Async.AwaitTask task | |
} | |
// Remove entries from the cache until bounds are satisfied. Needs writeLock. | |
let havingLockEnsureBound () = | |
if cache.Count >= maxCount then | |
// remove excess cached values, and some more while we're at it | |
let countToRemove = (cache.Count - maxCount) + removeBulkSize | |
let toRemoveCached = | |
cache | |
|> Seq.choose (fun kv -> | |
match kv.Value with | |
| CachedValue cachedValue -> Some (kv.Key, cachedValue) | |
| _ -> None | |
) | |
|> Seq.sortBy (fun (_, cachedValue) -> cachedValue.LastAccessedAt) | |
|> Seq.map fst | |
|> Seq.truncate countToRemove | |
|> Seq.toList | |
toRemoveCached |> List.iter (fun e -> cache.TryRemove e |> ignore) | |
// check if we satisfy the bound now | |
if cache.Count >= maxCount then | |
// there are still too many items. above we have only removed | |
// CachedValues. Apparently we have deleted them all and we are now left with | |
// PendingRequests only. | |
let countToRemove = (cache.Count - maxCount) + removeBulkSize | |
// I don't care which PendingRequest was accessed least recently | |
// because I assume every PendingRequest is recent enough. | |
// | |
// any consumer will still hold a reference to the task so they will | |
// receive their answer. it just won't be cached here. | |
let toRemove = | |
cache.Keys | |
|> Seq.truncate countToRemove | |
|> Seq.toList | |
toRemove |> List.iter (fun e -> cache.TryRemove e |> ignore) | |
// Start a Task that calls the user-supplied function and store the result in the cache. | |
// If there was an exception during the call, remove the CacheEntry and | |
// propagate the exception to anyone that currently waits on the task. | |
let startRequestTask key = Async.StartAsTask(async { | |
try | |
let! value = request key | |
let entry = CachedValue { Value = value; RetrievedAt = now(); LastAccessedAt = now() } | |
// we've got the value. now replace cached entry | |
do! Async.AwaitTask (writeLock.WaitAsync()) | |
try | |
havingLockEnsureBound () | |
cache.[key] <- entry | |
finally | |
writeLock.Release() |> ignore | |
return value | |
with | |
| e -> | |
// something failed. remove cached entry and propagate exception | |
// to anyone that waits on this task | |
do! Async.AwaitTask (writeLock.WaitAsync()) | |
try | |
cache.TryRemove key |> ignore | |
finally | |
writeLock.Release() |> ignore | |
return raise e | |
}) | |
// Acquire the writeLock, try to get the cached value or pending request. | |
// If its a miss, start a new request task. | |
// The lock is necessary to avoid two threads to start a new task simultaneously. | |
let getOrRequest key = async { | |
do! Async.AwaitTask (writeLock.WaitAsync()) | |
let entry = | |
try | |
// Why do we try to get the cached value here? | |
// Consider two threads A, B that request the same value: | |
// - A gets the lock | |
// - B cannot get the lock, waits | |
// - A reads the cache, it's a miss. Creates a PendingRequest and inserts it in the cache. | |
// - A releases the lock and returns the value. | |
// - B gets the lock | |
// - B reads the cache, it's a hit. | |
// - B releases the lock and returns the value. | |
match cache.TryGetValue(key) with | |
| true, existingEntry when isValidOrPending existingEntry -> | |
// A valid or pending cache entry found, use that. | |
existingEntry | |
| _ -> | |
// Start a new request task because only an invalid entry was | |
// found or none at all. | |
havingLockEnsureBound () | |
let newEntry = PendingRequest (startRequestTask key) | |
cache.[key] <- newEntry | |
newEntry | |
finally | |
writeLock.Release() |> ignore | |
return! accessValue entry | |
} | |
// perform a lock-free read. | |
// if the cached value is too old or missing, it starts a new request operation. | |
member __.Get key = async { | |
match cache.TryGetValue(key) with | |
| true, entry when isValidOrPending entry -> | |
return! accessValue entry | |
| _ -> | |
return! getOrRequest key | |
} | |
member __.Count = cache.Count |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment