Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
open Lwt_engine;
[@ocaml.warning "-3"];
module Lwt_sequence = Lwt_sequence;
[@ocaml.warning "+3"];
module Fd_map =
Map.Make({
type t = Unix.file_descr;
let compare = compare;
});
/* Type of a sleeper for the select engine. */
type sleeper = {
mutable time: float,
/* The time at which the sleeper should be wakeup. */
mutable stopped: bool,
/* [true] iff the event has been stopped. */
action: unit => unit,
/* The action for the sleeper. */
};
[@ocaml.warning "-3"]
module Sleep_queue =
Lwt_pqueue.Make({
type t = sleeper;
let compare = ({time: t1, _}, {time: t2, _}) => compare(t1, t2);
});
let rec get_next_timeout = sleep_queue =>
switch (Sleep_queue.lookup_min(sleep_queue)) {
| Some({stopped: true, _}) =>
get_next_timeout(Sleep_queue.remove_min(sleep_queue))
| Some({time, _}) => max(0., time -. Unix.gettimeofday())
| None => (-1.)
};
let bad_fd = fd =>
try({
let _ = Unix.fstat(fd);
false;
}) {
| Unix.Unix_error(_, _, _) => true
};
let rec restart_actions = (sleep_queue, now) =>
switch (Sleep_queue.lookup_min(sleep_queue)) {
| Some({stopped: true, _}) =>
restart_actions(Sleep_queue.remove_min(sleep_queue), now)
| Some({time, action, _}) when time <= now =>
/* We have to remove the sleeper to the queue before performing
the action. The action can change the sleeper's time, and this
might break the priority queue invariant if the sleeper is
still in the queue. */
let q = Sleep_queue.remove_min(sleep_queue);
action();
restart_actions(q, now);
| _ => sleep_queue
};
let invoke_actions = (fd, map) =>
switch (
try(Some(Fd_map.find(fd, map))) {
| Not_found => None
}
) {
| Some(actions) => Lwt_sequence.iter_l(f => f(), actions)
| None => ()
};
class virtual select_or_poll_based = {
as _;
inherit class abstract;
val mutable sleep_queue = Sleep_queue.empty;
/* Threads waiting for a timeout to expire. */
val mutable new_sleeps = [];
/* Sleepers added since the last iteration of the main loop:
They are not added immediately to the main sleep queue in order
to prevent them from being wakeup immediately. */
val mutable wait_readable = Fd_map.empty;
/* Sequences of actions waiting for file descriptors to become
readable. */
val mutable wait_writable = Fd_map.empty;
/* Sequences of actions waiting for file descriptors to become
writable. */
pri cleanup = ();
pri register_timer = (delay, repeat, f) =>
if (repeat) {
let rec sleeper = {
time: Unix.gettimeofday() +. delay,
stopped: false,
action: g,
}
and g = () => {
sleeper.time = Unix.gettimeofday() +. delay;
new_sleeps = [sleeper, ...new_sleeps];
f();
};
new_sleeps = [sleeper, ...new_sleeps];
lazy(sleeper.stopped = true);
} else {
let sleeper = {
time: Unix.gettimeofday() +. delay,
stopped: false,
action: f,
};
new_sleeps = [sleeper, ...new_sleeps];
lazy(sleeper.stopped = true);
};
pri register_readable = (fd, f) => {
let actions =
try(Fd_map.find(fd, wait_readable)) {
| Not_found =>
let actions = Lwt_sequence.create();
wait_readable = Fd_map.add(fd, actions, wait_readable);
actions;
};
let node = Lwt_sequence.add_l(f, actions);
lazy(
{
Lwt_sequence.remove(node);
if (Lwt_sequence.is_empty(actions)) {
wait_readable = Fd_map.remove(fd, wait_readable);
};
}
);
};
pri register_writable = (fd, f) => {
let actions =
try(Fd_map.find(fd, wait_writable)) {
| Not_found =>
let actions = Lwt_sequence.create();
wait_writable = Fd_map.add(fd, actions, wait_writable);
actions;
};
let node = Lwt_sequence.add_l(f, actions);
lazy(
{
Lwt_sequence.remove(node);
if (Lwt_sequence.is_empty(actions)) {
wait_writable = Fd_map.remove(fd, wait_writable);
};
}
);
};
};
module T = Domainslib.Task;
class virtual select_based (pool) = {
as self;
inherit class select_or_poll_based;
pri virtual select:
(list(Unix.file_descr), list(Unix.file_descr), float) =>
(list(Unix.file_descr), list(Unix.file_descr));
pub iter = block => {
/* Transfer all sleepers added since the last iteration to the
main sleep queue: */
sleep_queue =
List.fold_left(
(q, e) => Sleep_queue.add(e, q),
sleep_queue,
new_sleeps,
);
new_sleeps = [];
/* Collect file descriptors. */
let fds_r = Fd_map.fold((fd, _, l) => [fd, ...l], wait_readable, []);
let fds_w = Fd_map.fold((fd, _, l) => [fd, ...l], wait_writable, []);
/* Compute the timeout. */
let timeout =
if (block) {
get_next_timeout(sleep_queue);
} else {
0.;
};
/* Do the blocking call */
let (fds_r, fds_w) =
try(self#select(fds_r, fds_w, timeout)) {
| [@implicit_arity] Unix.Unix_error(Unix.EINTR, _, _) => ([], [])
| [@implicit_arity] Unix.Unix_error(Unix.EBADF, _, _) =>
/* Keeps only bad file descriptors. Actions registered on
them have to handle the error: */
(List.filter(bad_fd, fds_r), List.filter(bad_fd, fds_w))
};
/* Restart threads waiting for a timeout: */
sleep_queue = restart_actions(sleep_queue, Unix.gettimeofday());
let sleep_queue_task =
T.async(pool, () => restart_actions(sleep_queue, Unix.gettimeofday()));
/* Restart threads waiting on a file descriptors: */
let fds_r_tasks =
List.map(
fd => T.async(pool, () => invoke_actions(fd, wait_readable)),
fds_r,
);
let fds_w_tasks =
List.map(
fd => T.async(pool, () => invoke_actions(fd, wait_writable)),
fds_w,
);
sleep_queue = T.await(pool, sleep_queue_task);
List.iter(T.await(pool), fds_r_tasks);
List.iter(T.await(pool), fds_w_tasks);
};
};
[@warning "-7"]
class select (pool) = {
as _;
inherit (class select_based)(pool);
pri select = (fds_r, fds_w, timeout) => {
let (fds_r, fds_w, _) = Unix.select(fds_r, fds_w, [], timeout);
(fds_r, fds_w);
};
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment