Last active
February 27, 2018 10:33
-
-
Save Szer/e9039e19c407a496efde44126c349626 to your computer and use it in GitHub Desktop.
WorkerPoolHopac
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#I @"C:\Users\___\.nuget\packages\hopac\0.3.23\lib\netstandard1.6" | |
#r "Hopac.dll" | |
#r "Hopac.Core.dll" | |
#r "Hopac.Platform.dll" | |
open System | |
open Hopac | |
open Hopac.Infixes | |
open Hopac.Stream | |
type Pool<'i,'o>(worker: 'i -> Job<'o>, ?degree) = | |
let maxUsage = defaultArg degree Environment.ProcessorCount*2 | |
let inputCh, degreeCh, doneCh = Ch(), Ch(), Ch() | |
let goodOutput = Src.create() | |
let badOutput = Src.create() | |
let rec loop usage degree = | |
degreeCh ^=> loop usage | |
<|> doneCh ^=> (function | |
| Choice1Of2 x -> Src.value goodOutput x | |
| Choice2Of2 e -> Src.value badOutput e | |
>=> fun _ -> loop (usage - 1) degree) | |
<|> if usage < degree then | |
inputCh ^=> (worker | |
>> Job.catch | |
>=> Ch.give doneCh | |
>> Job.queue | |
>=> fun _ -> loop (usage + 1) degree) | |
else Alt.never() | |
do loop 0 maxUsage |> server | |
member __.SetDegree degree = degreeCh *<- degree |> queue | |
member __.Queue workItem = inputCh *<- workItem |> queue | |
member __.Output = Src.tap goodOutput | |
member __.Errors = Src.tap badOutput | |
//Usage example | |
let work i = | |
timeOutMillis 1000 >>- fun _ -> | |
if DateTime.UtcNow.Ticks % 7L = 0L //random error sim | |
then failwithf "error with %d" i | |
else i * 2 | |
let pool = Pool work | |
pool.Output | |
|> Stream.iterFun (printfn "success: %d") | |
|> queue //non-blocking | |
pool.Errors | |
|> Stream.iterFun (printfn "error: %A") | |
|> queue //non-blocking | |
seq{1..100} | |
|> Stream.ofSeq | |
|> Stream.iterFun pool.Queue | |
|> queue //non-blocking | |
pool.SetDegree 1 //non-blocking |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment