Skip to content

Instantly share code, notes, and snippets.

@sabine
Created August 5, 2023 10:40
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sabine/d0c31da025de86a6fff7d26d165cf0c6 to your computer and use it in GitHub Desktop.
Save sabine/d0c31da025de86a6fff7d26d165cf0c6 to your computer and use it in GitHub Desktop.
Run 25 tasks at a time with Eio at maximum concurrency while capturing errors
open Eio
let process item = if Random.int 4 = 0 then Error (`Item item) else Ok ()
let () =
Eio_main.run @@ fun env ->
let clock = Eio.Stdenv.clock env in
(* stream with capacity of 25 limits concurrent execution *)
let stream = Eio.Stream.create 25 in
let errors = ref [] in
let process_item i () =
(* writing to the stream blocks when full *)
Stream.add stream ();
(* report queue length and wait, to demonstrate we stay at max capacity *)
traceln "queue length: %d " (Stream.length stream);
Eio.Time.sleep clock 0.01;
(match process i with
| Error (`Item i) ->
errors := i :: !errors
| _ -> ()
);
(* take an element out to make room for another task to start *)
Stream.take stream;
Fiber.yield ()
in
List.init 1000 process_item |> Fiber.all;
(* demonstrate that the errors have been captured *)
!errors |> List.map string_of_int |> String.concat "," |> print_endline
@sabine
Copy link
Author

sabine commented Aug 5, 2023

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment