Skip to content

Instantly share code, notes, and snippets.

@Szer
Last active March 5, 2022 18:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Szer/a8f4ac7921ac4ed4f0a0a777602bb67a to your computer and use it in GitHub Desktop.
Save Szer/a8f4ac7921ac4ed4f0a0a777602bb67a to your computer and use it in GitHub Desktop.
#r "nuget: FSharp.Control.AsyncSeq, 3.2.1"
open System
open System.Threading.Channels
open System.Threading.Tasks
open FSharp.Control
// это наша асинхронная операция
let doWork x = task {
do! Task.Delay 100
return x + 1
}
// обработка входных данных с заданным количеством потоков
let processWork workersCount workFun workToDo =
// этот ченел будет считывать данные из workToDo
let serverChannel = Channel.CreateBounded 1
// это ченел куда воркеры будут скидывать результаты
let resultChannel = Channel.CreateBounded 1
// читаем входные данные и отправляем их на обработку
let _ = task {
for workUnit in workToDo do
do! serverChannel.Writer.WriteAsync workUnit
serverChannel.Writer.TryComplete() |> ignore
}
// кучка воркеров которые будут обрабатывать данные
let workers = Array.init workersCount (fun _ ->
task {
while not serverChannel.Reader.Completion.IsCompleted do
try
let! workUnit = serverChannel.Reader.ReadAsync()
let! result = workFun workUnit
do! resultChannel.Writer.WriteAsync result
with
| :? ChannelClosedException -> ()
} :> Task
)
// когда все воркеры закончили, закрываем ченел с результатами чтобы IAsyncEnumerable завершился
let _ = task {
do! Task.WhenAll workers
resultChannel.Writer.TryComplete() |> ignore
}
// на выход просто отдаём IAsyncEnumerable
resultChannel.Reader.ReadAllAsync()
[1..100]
|> processWork 5 doWork
|> AsyncSeq.ofAsyncEnum
|> AsyncSeq.iter Console.WriteLine
|> Async.RunSynchronously
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment