Skip to content

Instantly share code, notes, and snippets.

@akhansari
Last active March 3, 2022 11:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akhansari/d88812b742aa6be1c35b4f46bd9f8532 to your computer and use it in GitHub Desktop.
Save akhansari/d88812b742aa6be1c35b4f46bd9f8532 to your computer and use it in GitHub Desktop.
F#: IEnumerable to IAsyncEnumerable
module AsyncSeq =
open System.Collections.Generic
open System.Threading.Tasks
let cancelled (cancellationToken: CancellationToken) =
Task.FromCanceled<bool> cancellationToken
|> ValueTask<bool>
let ofSeq (sq: Task<'T> seq) = {
new IAsyncEnumerable<'T> with
member _.GetAsyncEnumerator cancellationToken =
let enumerator = sq.GetEnumerator()
let mutable current = Unchecked.defaultof<_>
{ new IAsyncEnumerator<'T> with
member _.Current =
current
member _.DisposeAsync() =
enumerator.Dispose()
ValueTask.CompletedTask
member _.MoveNextAsync() =
if cancellationToken.IsCancellationRequested then
cancelled cancellationToken
else
task {
if enumerator.MoveNext() then
let! res = enumerator.Current
current <- res
return true
else
return false
}
|> ValueTask<bool>
}
}
let ofSeqMap mapping (sq: 'T seq) = {
new IAsyncEnumerable<'U> with
member _.GetAsyncEnumerator cancellationToken =
let enumerator = sq.GetEnumerator()
let mutable current = Unchecked.defaultof<_>
{ new IAsyncEnumerator<'U> with
member _.Current =
current
member _.DisposeAsync() =
enumerator.Dispose()
ValueTask.CompletedTask
member _.MoveNextAsync() =
if cancellationToken.IsCancellationRequested then
cancelled cancellationToken
else
task {
if enumerator.MoveNext() then
let! res = mapping enumerator.Current
current <- res
return true
else
return false
}
|> ValueTask<bool>
}
}
let iter action (asyncSeq: IAsyncEnumerable<'T>) =
let rec iter (enumerator: IAsyncEnumerator<'T>) =
task {
let! go = enumerator.MoveNextAsync()
if go then
do! action enumerator.Current
return! iter enumerator
}
task {
use enumerator = asyncSeq.GetAsyncEnumerator()
do! iter enumerator
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment