Last active
March 3, 2022 11:50
-
-
Save akhansari/d88812b742aa6be1c35b4f46bd9f8532 to your computer and use it in GitHub Desktop.
F#: IEnumerable to IAsyncEnumerable
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module AsyncSeq = | |
open System.Collections.Generic | |
open System.Threading.Tasks | |
let cancelled (cancellationToken: CancellationToken) = | |
Task.FromCanceled<bool> cancellationToken | |
|> ValueTask<bool> | |
let ofSeq (sq: Task<'T> seq) = { | |
new IAsyncEnumerable<'T> with | |
member _.GetAsyncEnumerator cancellationToken = | |
let enumerator = sq.GetEnumerator() | |
let mutable current = Unchecked.defaultof<_> | |
{ new IAsyncEnumerator<'T> with | |
member _.Current = | |
current | |
member _.DisposeAsync() = | |
enumerator.Dispose() | |
ValueTask.CompletedTask | |
member _.MoveNextAsync() = | |
if cancellationToken.IsCancellationRequested then | |
cancelled cancellationToken | |
else | |
task { | |
if enumerator.MoveNext() then | |
let! res = enumerator.Current | |
current <- res | |
return true | |
else | |
return false | |
} | |
|> ValueTask<bool> | |
} | |
} | |
let ofSeqMap mapping (sq: 'T seq) = { | |
new IAsyncEnumerable<'U> with | |
member _.GetAsyncEnumerator cancellationToken = | |
let enumerator = sq.GetEnumerator() | |
let mutable current = Unchecked.defaultof<_> | |
{ new IAsyncEnumerator<'U> with | |
member _.Current = | |
current | |
member _.DisposeAsync() = | |
enumerator.Dispose() | |
ValueTask.CompletedTask | |
member _.MoveNextAsync() = | |
if cancellationToken.IsCancellationRequested then | |
cancelled cancellationToken | |
else | |
task { | |
if enumerator.MoveNext() then | |
let! res = mapping enumerator.Current | |
current <- res | |
return true | |
else | |
return false | |
} | |
|> ValueTask<bool> | |
} | |
} | |
let iter action (asyncSeq: IAsyncEnumerable<'T>) = | |
let rec iter (enumerator: IAsyncEnumerator<'T>) = | |
task { | |
let! go = enumerator.MoveNextAsync() | |
if go then | |
do! action enumerator.Current | |
return! iter enumerator | |
} | |
task { | |
use enumerator = asyncSeq.GetAsyncEnumerator() | |
do! iter enumerator | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment