Created
July 8, 2020 17:19
-
-
Save bartelink/ea3e9930f338729d41f7c18990c03dc1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// Thread-safe coordinator that batches concurrent requests for a single <c>dispatch</> invocation such that: | |
/// - requests arriving together can be coalesced into the batch during the linger period via TryAdd | |
/// - callers that have had items admitted can concurrently await the shared fate of the dispatch via AwaitResult | |
/// - callers whose TryAdd has been denied can await the completion of the in-flight batch via AwaitCompletion | |
type internal AsyncBatch<'Req, 'Res>(dispatch : 'Req[] -> Async<'Res>, linger : System.TimeSpan) = | |
let lingerMs = int linger.TotalMilliseconds | |
// Yes, naive impl in the absence of a cleaner way to have callers sharing the AwaitCompletion coordinate the adding | |
do if lingerMs < 5 then invalidArg "linger" "must be >= 5ms" | |
let queue = new System.Collections.Concurrent.BlockingCollection<'Req>() | |
let dispatch = async { | |
do! Async.Sleep lingerMs | |
queue.CompleteAdding() | |
let reqs = queue.ToArray() | |
return! dispatch reqs | |
} | |
let task = lazy (Async.StartAsTask dispatch) | |
/// Attempt to add a request to the flight | |
/// Succeeds during linger interval (which commences when the first caller triggers the dispatch via AwaitResult) | |
/// Fails if this flight has closed (caller should generate a fresh, potentially after awaiting this.AwaitCompletion) | |
member __.TryAdd(item) = queue.TryAdd(item) | |
/// Await the outcome of dispatching the batch (on the basis that the caller has contributed a request to it) | |
member __.AwaitResult() = Async.AwaitTaskCorrect task.Value | |
/// Wait for dispatch to conclude (for any reason: ok/exn/cancel; we only care about the channel being clear) | |
member __.AwaitCompletion() = | |
Async.FromContinuations(fun (cont, _, _) -> | |
task.Value.ContinueWith(fun (_ : System.Threading.Tasks.Task<'Res>) -> cont ()) | |
|> ignore) | |
/// Manages concurrent work such that requests arriving while a batch is in flight converge to wait for the next window | |
type AsyncBatchingGate<'Req, 'Res>(dispatch : 'Req[] -> Async<'Res>, ?linger) = | |
let mkBatch () = AsyncBatch(dispatch, defaultArg linger (System.TimeSpan.FromMilliseconds 5.)) | |
let mutable cell = mkBatch() | |
member __.Execute req = async { | |
let current = cell | |
// If current has not yet been dispatched, hop on and join | |
if current.TryAdd req then | |
return! current.AwaitResult() | |
else // Any thread that discovers a batch in flight, needs to wait for it to conclude first | |
do! current.AwaitCompletion() | |
// where competing threads discover a closed flight, we only want a single one to regenerate it | |
let _ = System.Threading.Interlocked.CompareExchange(&cell, mkBatch (), current) | |
return! __.Execute req | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
now in jet/equinox#228 (with tests and handling for exn from TryAdd)