Created
June 4, 2013 05:54
-
-
Save CarstenKoenig/5703876 to your computer and use it in GitHub Desktop.
the changed AwaitObservable method
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// Creates an asynchronous workflow that will be resumed when the | |
/// specified observables produces a value. The workflow will return | |
/// the value produced by the observable. | |
static member AwaitObservable(observable : IObservable<'T1>) = | |
let removeObj : IDisposable option ref = ref None | |
let removeLock = new obj() | |
let setRemover r = | |
lock removeLock (fun () -> removeObj := Some r) | |
let remove() = | |
lock removeLock (fun () -> | |
match !removeObj with | |
| Some d -> removeObj := None | |
d.Dispose() | |
| None -> ()) | |
synchronize (fun f -> | |
let workflow = | |
Async.FromContinuations((fun (cont,econt,ccont) -> | |
let rec finish cont value = | |
remove() | |
f (fun () -> cont value) | |
setRemover <| | |
observable.Subscribe | |
({ new IObserver<_> with | |
member x.OnNext(v) = finish cont v | |
member x.OnError(e) = finish econt e | |
member x.OnCompleted() = | |
let msg = "Cancelling the workflow, because the Observable awaited using AwaitObservable has completed." | |
finish ccont (new System.OperationCanceledException(msg)) }) | |
() )) | |
async { | |
let! cToken = Async.CancellationToken | |
let token : CancellationToken = cToken | |
use registration = token.Register(fun () -> remove()) | |
return! workflow | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
BTW: you might ask why I first grab cToken and then just create a new binding token for it ... well if I try to use cToken directly I get a compiler error (it wants a type decleration on cToken and you cannot do this with let!) - seems like you don't really need the token = ... but just another line but well ... really strange