open Lwt_engine; | |
[@ocaml.warning "-3"]; | |
module Lwt_sequence = Lwt_sequence; | |
[@ocaml.warning "+3"]; | |
module Fd_map = | |
Map.Make({ | |
type t = Unix.file_descr; | |
let compare = compare; | |
}); | |
/* Type of a sleeper for the select engine. */ | |
type sleeper = { | |
mutable time: float, | |
/* The time at which the sleeper should be wakeup. */ | |
mutable stopped: bool, | |
/* [true] iff the event has been stopped. */ | |
action: unit => unit, | |
/* The action for the sleeper. */ | |
}; | |
[@ocaml.warning "-3"] | |
module Sleep_queue = | |
Lwt_pqueue.Make({ | |
type t = sleeper; | |
let compare = ({time: t1, _}, {time: t2, _}) => compare(t1, t2); | |
}); | |
let rec get_next_timeout = sleep_queue => | |
switch (Sleep_queue.lookup_min(sleep_queue)) { | |
| Some({stopped: true, _}) => | |
get_next_timeout(Sleep_queue.remove_min(sleep_queue)) | |
| Some({time, _}) => max(0., time -. Unix.gettimeofday()) | |
| None => (-1.) | |
}; | |
let bad_fd = fd => | |
try({ | |
let _ = Unix.fstat(fd); | |
false; | |
}) { | |
| Unix.Unix_error(_, _, _) => true | |
}; | |
let rec restart_actions = (sleep_queue, now) => | |
switch (Sleep_queue.lookup_min(sleep_queue)) { | |
| Some({stopped: true, _}) => | |
restart_actions(Sleep_queue.remove_min(sleep_queue), now) | |
| Some({time, action, _}) when time <= now => | |
/* We have to remove the sleeper to the queue before performing | |
the action. The action can change the sleeper's time, and this | |
might break the priority queue invariant if the sleeper is | |
still in the queue. */ | |
let q = Sleep_queue.remove_min(sleep_queue); | |
action(); | |
restart_actions(q, now); | |
| _ => sleep_queue | |
}; | |
let invoke_actions = (fd, map) => | |
switch ( | |
try(Some(Fd_map.find(fd, map))) { | |
| Not_found => None | |
} | |
) { | |
| Some(actions) => Lwt_sequence.iter_l(f => f(), actions) | |
| None => () | |
}; | |
class virtual select_or_poll_based = { | |
as _; | |
inherit class abstract; | |
val mutable sleep_queue = Sleep_queue.empty; | |
/* Threads waiting for a timeout to expire. */ | |
val mutable new_sleeps = []; | |
/* Sleepers added since the last iteration of the main loop: | |
They are not added immediately to the main sleep queue in order | |
to prevent them from being wakeup immediately. */ | |
val mutable wait_readable = Fd_map.empty; | |
/* Sequences of actions waiting for file descriptors to become | |
readable. */ | |
val mutable wait_writable = Fd_map.empty; | |
/* Sequences of actions waiting for file descriptors to become | |
writable. */ | |
pri cleanup = (); | |
pri register_timer = (delay, repeat, f) => | |
if (repeat) { | |
let rec sleeper = { | |
time: Unix.gettimeofday() +. delay, | |
stopped: false, | |
action: g, | |
} | |
and g = () => { | |
sleeper.time = Unix.gettimeofday() +. delay; | |
new_sleeps = [sleeper, ...new_sleeps]; | |
f(); | |
}; | |
new_sleeps = [sleeper, ...new_sleeps]; | |
lazy(sleeper.stopped = true); | |
} else { | |
let sleeper = { | |
time: Unix.gettimeofday() +. delay, | |
stopped: false, | |
action: f, | |
}; | |
new_sleeps = [sleeper, ...new_sleeps]; | |
lazy(sleeper.stopped = true); | |
}; | |
pri register_readable = (fd, f) => { | |
let actions = | |
try(Fd_map.find(fd, wait_readable)) { | |
| Not_found => | |
let actions = Lwt_sequence.create(); | |
wait_readable = Fd_map.add(fd, actions, wait_readable); | |
actions; | |
}; | |
let node = Lwt_sequence.add_l(f, actions); | |
lazy( | |
{ | |
Lwt_sequence.remove(node); | |
if (Lwt_sequence.is_empty(actions)) { | |
wait_readable = Fd_map.remove(fd, wait_readable); | |
}; | |
} | |
); | |
}; | |
pri register_writable = (fd, f) => { | |
let actions = | |
try(Fd_map.find(fd, wait_writable)) { | |
| Not_found => | |
let actions = Lwt_sequence.create(); | |
wait_writable = Fd_map.add(fd, actions, wait_writable); | |
actions; | |
}; | |
let node = Lwt_sequence.add_l(f, actions); | |
lazy( | |
{ | |
Lwt_sequence.remove(node); | |
if (Lwt_sequence.is_empty(actions)) { | |
wait_writable = Fd_map.remove(fd, wait_writable); | |
}; | |
} | |
); | |
}; | |
}; | |
module T = Domainslib.Task; | |
class virtual select_based (pool) = { | |
as self; | |
inherit class select_or_poll_based; | |
pri virtual select: | |
(list(Unix.file_descr), list(Unix.file_descr), float) => | |
(list(Unix.file_descr), list(Unix.file_descr)); | |
pub iter = block => { | |
/* Transfer all sleepers added since the last iteration to the | |
main sleep queue: */ | |
sleep_queue = | |
List.fold_left( | |
(q, e) => Sleep_queue.add(e, q), | |
sleep_queue, | |
new_sleeps, | |
); | |
new_sleeps = []; | |
/* Collect file descriptors. */ | |
let fds_r = Fd_map.fold((fd, _, l) => [fd, ...l], wait_readable, []); | |
let fds_w = Fd_map.fold((fd, _, l) => [fd, ...l], wait_writable, []); | |
/* Compute the timeout. */ | |
let timeout = | |
if (block) { | |
get_next_timeout(sleep_queue); | |
} else { | |
0.; | |
}; | |
/* Do the blocking call */ | |
let (fds_r, fds_w) = | |
try(self#select(fds_r, fds_w, timeout)) { | |
| [@implicit_arity] Unix.Unix_error(Unix.EINTR, _, _) => ([], []) | |
| [@implicit_arity] Unix.Unix_error(Unix.EBADF, _, _) => | |
/* Keeps only bad file descriptors. Actions registered on | |
them have to handle the error: */ | |
(List.filter(bad_fd, fds_r), List.filter(bad_fd, fds_w)) | |
}; | |
/* Restart threads waiting for a timeout: */ | |
sleep_queue = restart_actions(sleep_queue, Unix.gettimeofday()); | |
let sleep_queue_task = | |
T.async(pool, () => restart_actions(sleep_queue, Unix.gettimeofday())); | |
/* Restart threads waiting on a file descriptors: */ | |
let fds_r_tasks = | |
List.map( | |
fd => T.async(pool, () => invoke_actions(fd, wait_readable)), | |
fds_r, | |
); | |
let fds_w_tasks = | |
List.map( | |
fd => T.async(pool, () => invoke_actions(fd, wait_writable)), | |
fds_w, | |
); | |
sleep_queue = T.await(pool, sleep_queue_task); | |
List.iter(T.await(pool), fds_r_tasks); | |
List.iter(T.await(pool), fds_w_tasks); | |
}; | |
}; | |
[@warning "-7"] | |
class select (pool) = { | |
as _; | |
inherit (class select_based)(pool); | |
pri select = (fds_r, fds_w, timeout) => { | |
let (fds_r, fds_w, _) = Unix.select(fds_r, fds_w, [], timeout); | |
(fds_r, fds_w); | |
}; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment