Skip to content

Instantly share code, notes, and snippets.

@gubser
Last active January 31, 2019 19:46
Show Gist options
  • Save gubser/0888d947a701ad11b4b8fdccacdf1ed3 to your computer and use it in GitHub Desktop.
Save gubser/0888d947a701ad11b4b8fdccacdf1ed3 to your computer and use it in GitHub Desktop.
A bounded cache for async operations in F#
module AsyncBoundedCache
open System
open System.Collections.Concurrent
open System.Threading
open System.Threading.Tasks
let startTime = DateTime.UtcNow.Ticks / 10000000L
let now () = int ((DateTime.UtcNow.Ticks / 10000000L) - startTime)
type CachedValue<'value> = {
Value: 'value
RetrievedAt: int
mutable LastAccessedAt: int
}
type CacheEntry<'value> =
| CachedValue of CachedValue<'value>
| PendingRequest of Task<'value>
type AsyncBoundedCache<'key,'value> (maxCount: int, removeBulkSize: int, maxRetrievalAge: int,
request: 'key -> Async<'value>) =
let cache = ConcurrentDictionary<'key, CacheEntry<'value>>()
// Lock to limit to only one write operation at a time.
// This way we can make sure that when two threads request the same
// (uncached) value at the same time, it will not start two
// independent, and potentially costly, request operations.
let writeLock = new SemaphoreSlim(1, 1)
let isValid cachedValue =
(now() - cachedValue.RetrievedAt) < maxRetrievalAge
let isValidOrPending entry =
match entry with
| CachedValue cachedValue -> isValid cachedValue
| PendingRequest _ -> true
// Get the value from a CacheEntry and update its access time
let accessValue entry = async {
match entry with
| CachedValue cachedValue ->
cachedValue.LastAccessedAt <- now()
return cachedValue.Value
| PendingRequest task ->
return! Async.AwaitTask task
}
// Remove entries from the cache until bounds are satisfied. Needs writeLock.
let havingLockEnsureBound () =
if cache.Count >= maxCount then
// remove excess cached values, and some more while we're at it
let countToRemove = (cache.Count - maxCount) + removeBulkSize
let toRemoveCached =
cache
|> Seq.choose (fun kv ->
match kv.Value with
| CachedValue cachedValue -> Some (kv.Key, cachedValue)
| _ -> None
)
|> Seq.sortBy (fun (_, cachedValue) -> cachedValue.LastAccessedAt)
|> Seq.map fst
|> Seq.truncate countToRemove
|> Seq.toList
toRemoveCached |> List.iter (fun e -> cache.TryRemove e |> ignore)
// check if we satisfy the bound now
if cache.Count >= maxCount then
// there are still too many items. above we have only removed
// CachedValues. Apparently we have deleted them all and we are now left with
// PendingRequests only.
let countToRemove = (cache.Count - maxCount) + removeBulkSize
// I don't care which PendingRequest was accessed least recently
// because I assume every PendingRequest is recent enough.
//
// any consumer will still hold a reference to the task so they will
// receive their answer. it just won't be cached here.
let toRemove =
cache.Keys
|> Seq.truncate countToRemove
|> Seq.toList
toRemove |> List.iter (fun e -> cache.TryRemove e |> ignore)
// Start a Task that calls the user-supplied function and store the result in the cache.
// If there was an exception during the call, remove the CacheEntry and
// propagate the exception to anyone that currently waits on the task.
let startRequestTask key = Async.StartAsTask(async {
try
let! value = request key
let entry = CachedValue { Value = value; RetrievedAt = now(); LastAccessedAt = now() }
// we've got the value. now replace cached entry
do! Async.AwaitTask (writeLock.WaitAsync())
try
havingLockEnsureBound ()
cache.[key] <- entry
finally
writeLock.Release() |> ignore
return value
with
| e ->
// something failed. remove cached entry and propagate exception
// to anyone that waits on this task
do! Async.AwaitTask (writeLock.WaitAsync())
try
cache.TryRemove key |> ignore
finally
writeLock.Release() |> ignore
return raise e
})
// Acquire the writeLock, try to get the cached value or pending request.
// If its a miss, start a new request task.
// The lock is necessary to avoid two threads to start a new task simultaneously.
let getOrRequest key = async {
do! Async.AwaitTask (writeLock.WaitAsync())
let entry =
try
// Why do we try to get the cached value here?
// Consider two threads A, B that request the same value:
// - A gets the lock
// - B cannot get the lock, waits
// - A reads the cache, it's a miss. Creates a PendingRequest and inserts it in the cache.
// - A releases the lock and returns the value.
// - B gets the lock
// - B reads the cache, it's a hit.
// - B releases the lock and returns the value.
match cache.TryGetValue(key) with
| true, existingEntry when isValidOrPending existingEntry ->
// A valid or pending cache entry found, use that.
existingEntry
| _ ->
// Start a new request task because only an invalid entry was
// found or none at all.
havingLockEnsureBound ()
let newEntry = PendingRequest (startRequestTask key)
cache.[key] <- newEntry
newEntry
finally
writeLock.Release() |> ignore
return! accessValue entry
}
// perform a lock-free read.
// if the cached value is too old or missing, it starts a new request operation.
member __.Get key = async {
match cache.TryGetValue(key) with
| true, entry when isValidOrPending entry ->
return! accessValue entry
| _ ->
return! getOrRequest key
}
member __.Count = cache.Count
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment