Skip to content

Instantly share code, notes, and snippets.

@dbrattli
Created December 16, 2018 14:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dbrattli/1d6b187a58d94374a2b4db0ffe2edac1 to your computer and use it in GitHub Desktop.
Save dbrattli/1d6b187a58d94374a2b4db0ffe2edac1 to your computer and use it in GitHub Desktop.
Convert Async Obserable to Async Enumerable
/// Convert async observable to async sequence, non-blocking.
/// Producer will be awaited until item is consumed by the async
/// enumerator.
let toAsyncSeq (source: IAsyncObservable<'a>) : AsyncSeq<'a> =
let ping = new AutoResetEvent false
let pong = new AutoResetEvent false
let mutable latest : Notification<'a> = OnCompleted
let _obv n =
async {
latest <- n
ping.Set () |> ignore
do! Async.AwaitWaitHandle pong |> Async.Ignore
}
asyncSeq {
let! dispose = AsyncObserver _obv |> source.SubscribeAsync
let mutable running = true
while running do
do! Async.AwaitWaitHandle ping |> Async.Ignore
match latest with
| OnNext x ->
yield x
| OnError ex ->
running <- false
raise ex
| OnCompleted ->
running <- false
pong.Set () |> ignore
do! dispose.DisposeAsync ()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment