Skip to content

Instantly share code, notes, and snippets.

@Nymphium
Last active February 8, 2023 14:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Nymphium/95d5f503165ab640df90ec4e3475fb9b to your computer and use it in GitHub Desktop.
Save Nymphium/95d5f503165ab640df90ec4e3475fb9b to your computer and use it in GitHub Desktop.
[@@@alert "-unstable"]
[@@@warning "-32"]
(* Reimplementation of Go's worker pools using Eio and Domainslib.Chan
https://gobyexample.com/worker-pools
*)
module Stdenv = struct
type _ Effect.t += Get : (Eio.Stdenv.t * Eio.Switch.t) Effect.t
let get () = Effect.perform Get
let run ~env ~sw f =
let effc (type a) (effc : a Effect.t) =
match effc with
| Get ->
Some
(fun (k : (a, _) Effect.Deep.continuation) -> Effect.Deep.continue k (env, sw))
| _ -> None
in
Effect.Deep.try_with f () { effc }
;;
end
module Clock = struct
type _ Effect.t += Get : Eio.Time.clock Effect.t
let get () = Effect.perform Get
let sleep flt = Eio.Time.sleep (get ()) flt
let now () = Eio.Time.now (get ())
let run (clock : Eio.Time.clock) f =
let effc (type a) (eff : a Effect.t) : ((a, 'r) Effect.Deep.continuation -> 'r) option
=
match eff with
| Get -> Some (fun k -> Effect.Deep.continue k clock)
| _ -> None
in
Effect.Deep.try_with f () { effc }
;;
end
module Chan_async = struct
module C = Domainslib.Chan
let make () = C.make_bounded 2
let recv chan =
let rec go () =
match C.recv_poll chan with
| Some v -> v
| None ->
Clock.sleep 0.01;
Eio.Fiber.yield ();
go ()
in
go ()
;;
let send chan v =
let rec go () =
if C.send_poll chan v
then ()
else (
Clock.sleep 0.01;
Eio.Fiber.yield ();
go ())
in
go ()
;;
let drain chan = ignore @@ recv chan
let[@warning "-21"] recv_forever chan f =
let rec go () =
let v = recv chan in
ignore @@ f v;
go ()
in
let env, sw = Stdenv.get () in
Eio.Fiber.fork_daemon ~sw
@@ fun () ->
Clock.run env#clock go;
`Stop_daemon
;;
module Syntax = struct
let ( <~ ) chan v = send chan v
let ( ~> ) chan = recv chan
let ( ~>! ) chan = drain chan
end
end
open Chan_async.Syntax
let worker id jobs results =
Chan_async.recv_forever jobs
@@ fun j ->
Printf.printf "worker %d started job %d\n" id j;
flush_all ();
Clock.sleep 1.;
Printf.printf "worker %d finished job %d\n" id j;
flush_all ();
results <~ j * 2
;;
let main () =
let env, _ = Stdenv.get () in
let jobs = Chan_async.make () in
let results = Chan_async.make () in
let num_jobs = 3 in
let num_data = 5 in
Clock.run env#clock
@@ fun () ->
let start = Clock.now () in
for w = 1 to num_jobs do
worker w jobs results
done;
for j = 1 to num_data do
jobs <~ j
done;
for _ = 1 to num_data do
~>!results
done;
let end' = Clock.now () in
Printf.printf "%f sec\n" (end' -. start)
;;
let () = Eio_main.run @@ fun env -> Eio.Switch.run @@ fun sw -> Stdenv.run ~sw ~env main
$ dune exec src/main.exe
Done: 87% (7/8, 1 left) (jobs: 1)worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 1 started job 4
worker 2 finished job 2
worker 2 started job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5
2.022990 sec
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment