Skip to content

Instantly share code, notes, and snippets.

@bartelink
Created December 11, 2020 16:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bartelink/14b7a1474e5376b59e62ddf8caacdb44 to your computer and use it in GitHub Desktop.
Save bartelink/14b7a1474e5376b59e62ddf8caacdb44 to your computer and use it in GitHub Desktop.
AwaitWithStopOnCancellation re https://github.com/jet/FsKafka/pull/83
(*
let a = async {
let! ct = Async.CancellationToken
do! System.Threading.Tasks.Task.Delay(5000,ct) |> Async.AwaitTask
}
let b = async {
failwith "B throw"
}
Async.Parallel[a;b] |> Async.RunSynchronously
*)
module Demo =
open System
open System.Threading.Tasks
type Async with
static member AwaitTaskCorrect (task : Task<'T>) : Async<'T> =
Async.FromContinuations <| fun (k,ek,_) ->
task.ContinueWith (fun (t:Task<'T>) ->
if t.IsFaulted then
let e = t.Exception
if e.InnerExceptions.Count = 1 then ek e.InnerExceptions.[0]
else ek e
elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled."))
elif t.IsCompleted then k t.Result
else ek(Exception "invalid Task state!"))
|> ignore
static member AwaitTaskCorrect (task : Task) : Async<unit> =
Async.FromContinuations <| fun (k,ek,_) ->
task.ContinueWith (fun (t:Task) ->
if t.IsFaulted then
let e = t.Exception
if e.InnerExceptions.Count = 1 then ek e.InnerExceptions.[0]
else ek e
elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled."))
elif t.IsCompleted then k ()
else ek(Exception "invalid Task state!"))
|> ignore
type Consumer(task, stop, name) =
interface System.IDisposable with member __.Dispose() = stop ()
static member Start(name) =
let cts = new System.Threading.CancellationTokenSource()
let stop () = cts.Cancel()
let run = async {
while not cts.IsCancellationRequested do
do! Async.Sleep 100
printfn "Stopped %s" name
}
new Consumer(Async.StartAsTask run, stop, name)
member __.Stop() = printfn "Stopping %s" name; stop ()
member __.AwaitShutdown() =
// NOTE NOT Async.AwaitTask task, or we hang in the case of termination via `Stop()`
Async.AwaitTaskCorrect task
let example = async {
use c1 = Consumer.Start("c1")
use c2 = Consumer.Start("c2")
let! ct = Async.CancellationToken
use _ = ct.Register(fun () -> c1.Stop(); c2.Stop())
printfn "Waiting..."
do! Async.Parallel [ async { failwith "Stop the world!" }; c1.AwaitShutdown(); c2.AwaitShutdown()] |> Async.Ignore<unit[]>
printfn "Finished"
}
[<AutoOpen>]
module ConsumerExt =
type Consumer with
member consumer.AwaitWithStopOnCancellation() = async {
let! ct = Async.CancellationToken
use _ = ct.Register(fun () -> consumer.Stop())
return! consumer.AwaitShutdown()
}
let example2 = async {
use c1 = Consumer.Start("c1")
use c2 = Consumer.Start("c2")
printfn "Waiting..."
do! Async.Parallel [ async { failwith "Stop the world; kick the people off!" }; c1.AwaitWithStopOnCancellation(); c2.AwaitWithStopOnCancellation()] |> Async.Ignore<unit[]>
printfn "Finished"
}
Async.RunSynchronously example2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment