Skip to content

Instantly share code, notes, and snippets.

@Szer
Last active February 27, 2018 10:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Szer/e9039e19c407a496efde44126c349626 to your computer and use it in GitHub Desktop.
Save Szer/e9039e19c407a496efde44126c349626 to your computer and use it in GitHub Desktop.
WorkerPoolHopac
#I @"C:\Users\___\.nuget\packages\hopac\0.3.23\lib\netstandard1.6"
#r "Hopac.dll"
#r "Hopac.Core.dll"
#r "Hopac.Platform.dll"
open System
open Hopac
open Hopac.Infixes
open Hopac.Stream
type Pool<'i,'o>(worker: 'i -> Job<'o>, ?degree) =
let maxUsage = defaultArg degree Environment.ProcessorCount*2
let inputCh, degreeCh, doneCh = Ch(), Ch(), Ch()
let goodOutput = Src.create()
let badOutput = Src.create()
let rec loop usage degree =
degreeCh ^=> loop usage
<|> doneCh ^=> (function
| Choice1Of2 x -> Src.value goodOutput x
| Choice2Of2 e -> Src.value badOutput e
>=> fun _ -> loop (usage - 1) degree)
<|> if usage < degree then
inputCh ^=> (worker
>> Job.catch
>=> Ch.give doneCh
>> Job.queue
>=> fun _ -> loop (usage + 1) degree)
else Alt.never()
do loop 0 maxUsage |> server
member __.SetDegree degree = degreeCh *<- degree |> queue
member __.Queue workItem = inputCh *<- workItem |> queue
member __.Output = Src.tap goodOutput
member __.Errors = Src.tap badOutput
//Usage example
let work i =
timeOutMillis 1000 >>- fun _ ->
if DateTime.UtcNow.Ticks % 7L = 0L //random error sim
then failwithf "error with %d" i
else i * 2
let pool = Pool work
pool.Output
|> Stream.iterFun (printfn "success: %d")
|> queue //non-blocking
pool.Errors
|> Stream.iterFun (printfn "error: %A")
|> queue //non-blocking
seq{1..100}
|> Stream.ofSeq
|> Stream.iterFun pool.Queue
|> queue //non-blocking
pool.SetDegree 1 //non-blocking
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment