Skip to content

Instantly share code, notes, and snippets.

@CarstenKoenig
Created June 4, 2013 05:54
Show Gist options
  • Save CarstenKoenig/5703876 to your computer and use it in GitHub Desktop.
Save CarstenKoenig/5703876 to your computer and use it in GitHub Desktop.
the changed AwaitObservable method
/// Creates an asynchronous workflow that will be resumed when the
/// specified observables produces a value. The workflow will return
/// the value produced by the observable.
static member AwaitObservable(observable : IObservable<'T1>) =
let removeObj : IDisposable option ref = ref None
let removeLock = new obj()
let setRemover r =
lock removeLock (fun () -> removeObj := Some r)
let remove() =
lock removeLock (fun () ->
match !removeObj with
| Some d -> removeObj := None
d.Dispose()
| None -> ())
synchronize (fun f ->
let workflow =
Async.FromContinuations((fun (cont,econt,ccont) ->
let rec finish cont value =
remove()
f (fun () -> cont value)
setRemover <|
observable.Subscribe
({ new IObserver<_> with
member x.OnNext(v) = finish cont v
member x.OnError(e) = finish econt e
member x.OnCompleted() =
let msg = "Cancelling the workflow, because the Observable awaited using AwaitObservable has completed."
finish ccont (new System.OperationCanceledException(msg)) })
() ))
async {
let! cToken = Async.CancellationToken
let token : CancellationToken = cToken
use registration = token.Register(fun () -> remove())
return! workflow
})
@CarstenKoenig
Copy link
Author

BTW: you might ask why I first grab cToken and then just create a new binding token for it ... well if I try to use cToken directly I get a compiler error (it wants a type decleration on cToken and you cannot do this with let!) - seems like you don't really need the token = ... but just another line but well ... really strange

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment