Skip to content

Instantly share code, notes, and snippets.

@anmonteiro
Forked from EduardoRFS/Parallel_select_engine.re
Created February 22, 2021 03:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anmonteiro/5f7e21426fd48a7a0682bc0f1ba4fc34 to your computer and use it in GitHub Desktop.
Save anmonteiro/5f7e21426fd48a7a0682bc0f1ba4fc34 to your computer and use it in GitHub Desktop.
open Lwt_engine;
[@ocaml.warning "-3"];
module Lwt_sequence = Lwt_sequence;
[@ocaml.warning "+3"];
module Fd_map =
Map.Make({
type t = Unix.file_descr;
let compare = compare;
});
/* Type of a sleeper for the select engine. */
type sleeper = {
mutable time: float,
/* The time at which the sleeper should be wakeup. */
mutable stopped: bool,
/* [true] iff the event has been stopped. */
action: unit => unit,
/* The action for the sleeper. */
};
[@ocaml.warning "-3"]
module Sleep_queue =
Lwt_pqueue.Make({
type t = sleeper;
let compare = ({time: t1, _}, {time: t2, _}) => compare(t1, t2);
});
let rec get_next_timeout = sleep_queue =>
switch (Sleep_queue.lookup_min(sleep_queue)) {
| Some({stopped: true, _}) =>
get_next_timeout(Sleep_queue.remove_min(sleep_queue))
| Some({time, _}) => max(0., time -. Unix.gettimeofday())
| None => (-1.)
};
let bad_fd = fd =>
try({
let _ = Unix.fstat(fd);
false;
}) {
| Unix.Unix_error(_, _, _) => true
};
let rec restart_actions = (sleep_queue, now) =>
switch (Sleep_queue.lookup_min(sleep_queue)) {
| Some({stopped: true, _}) =>
restart_actions(Sleep_queue.remove_min(sleep_queue), now)
| Some({time, action, _}) when time <= now =>
/* We have to remove the sleeper to the queue before performing
the action. The action can change the sleeper's time, and this
might break the priority queue invariant if the sleeper is
still in the queue. */
let q = Sleep_queue.remove_min(sleep_queue);
action();
restart_actions(q, now);
| _ => sleep_queue
};
let invoke_actions = (fd, map) =>
switch (
try(Some(Fd_map.find(fd, map))) {
| Not_found => None
}
) {
| Some(actions) => Lwt_sequence.iter_l(f => f(), actions)
| None => ()
};
class virtual select_or_poll_based = {
as _;
inherit class abstract;
val mutable sleep_queue = Sleep_queue.empty;
/* Threads waiting for a timeout to expire. */
val mutable new_sleeps = [];
/* Sleepers added since the last iteration of the main loop:
They are not added immediately to the main sleep queue in order
to prevent them from being wakeup immediately. */
val mutable wait_readable = Fd_map.empty;
/* Sequences of actions waiting for file descriptors to become
readable. */
val mutable wait_writable = Fd_map.empty;
/* Sequences of actions waiting for file descriptors to become
writable. */
pri cleanup = ();
pri register_timer = (delay, repeat, f) =>
if (repeat) {
let rec sleeper = {
time: Unix.gettimeofday() +. delay,
stopped: false,
action: g,
}
and g = () => {
sleeper.time = Unix.gettimeofday() +. delay;
new_sleeps = [sleeper, ...new_sleeps];
f();
};
new_sleeps = [sleeper, ...new_sleeps];
lazy(sleeper.stopped = true);
} else {
let sleeper = {
time: Unix.gettimeofday() +. delay,
stopped: false,
action: f,
};
new_sleeps = [sleeper, ...new_sleeps];
lazy(sleeper.stopped = true);
};
pri register_readable = (fd, f) => {
let actions =
try(Fd_map.find(fd, wait_readable)) {
| Not_found =>
let actions = Lwt_sequence.create();
wait_readable = Fd_map.add(fd, actions, wait_readable);
actions;
};
let node = Lwt_sequence.add_l(f, actions);
lazy(
{
Lwt_sequence.remove(node);
if (Lwt_sequence.is_empty(actions)) {
wait_readable = Fd_map.remove(fd, wait_readable);
};
}
);
};
pri register_writable = (fd, f) => {
let actions =
try(Fd_map.find(fd, wait_writable)) {
| Not_found =>
let actions = Lwt_sequence.create();
wait_writable = Fd_map.add(fd, actions, wait_writable);
actions;
};
let node = Lwt_sequence.add_l(f, actions);
lazy(
{
Lwt_sequence.remove(node);
if (Lwt_sequence.is_empty(actions)) {
wait_writable = Fd_map.remove(fd, wait_writable);
};
}
);
};
};
module T = Domainslib.Task;
class virtual select_based (pool) = {
as self;
inherit class select_or_poll_based;
pri virtual select:
(list(Unix.file_descr), list(Unix.file_descr), float) =>
(list(Unix.file_descr), list(Unix.file_descr));
pub iter = block => {
/* Transfer all sleepers added since the last iteration to the
main sleep queue: */
sleep_queue =
List.fold_left(
(q, e) => Sleep_queue.add(e, q),
sleep_queue,
new_sleeps,
);
new_sleeps = [];
/* Collect file descriptors. */
let fds_r = Fd_map.fold((fd, _, l) => [fd, ...l], wait_readable, []);
let fds_w = Fd_map.fold((fd, _, l) => [fd, ...l], wait_writable, []);
/* Compute the timeout. */
let timeout =
if (block) {
get_next_timeout(sleep_queue);
} else {
0.;
};
/* Do the blocking call */
let (fds_r, fds_w) =
try(self#select(fds_r, fds_w, timeout)) {
| [@implicit_arity] Unix.Unix_error(Unix.EINTR, _, _) => ([], [])
| [@implicit_arity] Unix.Unix_error(Unix.EBADF, _, _) =>
/* Keeps only bad file descriptors. Actions registered on
them have to handle the error: */
(List.filter(bad_fd, fds_r), List.filter(bad_fd, fds_w))
};
/* Restart threads waiting for a timeout: */
sleep_queue = restart_actions(sleep_queue, Unix.gettimeofday());
let sleep_queue_task =
T.async(pool, () => restart_actions(sleep_queue, Unix.gettimeofday()));
/* Restart threads waiting on a file descriptors: */
let fds_r_tasks =
List.map(
fd => T.async(pool, () => invoke_actions(fd, wait_readable)),
fds_r,
);
let fds_w_tasks =
List.map(
fd => T.async(pool, () => invoke_actions(fd, wait_writable)),
fds_w,
);
sleep_queue = T.await(pool, sleep_queue_task);
List.iter(T.await(pool), fds_r_tasks);
List.iter(T.await(pool), fds_w_tasks);
};
};
[@warning "-7"]
class select (pool) = {
as _;
inherit (class select_based)(pool);
pri select = (fds_r, fds_w, timeout) => {
let (fds_r, fds_w, _) = Unix.select(fds_r, fds_w, [], timeout);
(fds_r, fds_w);
};
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment