Skip to content

Instantly share code, notes, and snippets.

@panesofglass
Created September 13, 2010 00:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save panesofglass/576652 to your computer and use it in GitHub Desktop.
Save panesofglass/576652 to your computer and use it in GitHub Desktop.
Create an Observable from an F# Async.
module AsyncEx
open System
type Async<'a> with
member this.ToObservable() =
{ new IObservable<_> with
member x.Subscribe(o) =
if o = null then nullArg "observer"
let cts = new System.Threading.CancellationTokenSource()
let invoked = ref 0
let cancelOrDispose cancel =
if System.Threading.Interlocked.CompareExchange(invoked, 1, 0) = 0 then
if cancel then cts.Cancel() else cts.Dispose()
let wrapper = async {
try
try
let! r = this
o.OnNext(r)
o.OnCompleted()
with e -> o.OnError(e)
finally cancelOrDispose false }
Async.StartImmediate(wrapper, cts.Token)
{ new IDisposable with member x.Dispose() = cancelOrDispose true } }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment