Last active
July 7, 2018 12:02
-
-
Save danielrichman/b425c2d23ac5177d0d543e121cf247cf to your computer and use it in GitHub Desktop.
baby async+epoll implementation, and proof-of-concept web server using it
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open Core | |
module Monitor = struct | |
type t = unit | |
end | |
module Scheduler0 : sig | |
val enqueue | |
: (unit -> unit) | |
-> Monitor.t | |
-> unit | |
val run_one_cycle : unit -> unit | |
val current_monitor : unit -> Monitor.t | |
val internal__has_any_jobs : unit -> bool | |
end = struct | |
let current_monitor_ref = ref (() : Monitor.t) (* the "toplevel" monitor *) | |
let current_monitor () = !current_monitor_ref | |
type job = | |
{ thunk : unit -> unit | |
; monitor : Monitor.t | |
} | |
let job_queue : job Queue.t = Queue.create () | |
let enqueue thunk monitor = | |
Queue.enqueue job_queue { thunk; monitor } | |
let run_one_cycle () = | |
let jobs = Queue.to_list job_queue in | |
Queue.clear job_queue; | |
List.iter jobs (fun { thunk; monitor } -> | |
current_monitor_ref := monitor; | |
match thunk () with | |
| () -> () | |
| exception exn -> | |
(* Monitor.send_exn monitor exn *) | |
raise_s [%message "sending exns to monitors not implemented" (exn : exn)] | |
) | |
let internal__has_any_jobs () = not (Queue.is_empty job_queue) | |
end | |
module Ivar : sig | |
type 'a t | |
val create : unit -> 'a t | |
val fill : 'a t -> 'a -> unit | |
val internal__upon : 'a t -> f:('a -> unit) -> unit | |
end = struct | |
type 'a t' = | |
| Full of 'a | |
| Empty of { handlers : (('a -> unit) * Monitor.t) Queue.t } | |
type 'a t = 'a t' ref | |
let create () = ref (Empty { handlers = Queue.create () }) | |
let fill t value = | |
match !t with | |
| Full _ -> failwith "fill called twice" | |
| Empty { handlers } -> | |
t := Full value; | |
Queue.iter handlers ~f:(fun (f, monitor) -> | |
Scheduler0.enqueue (fun () -> f value) monitor | |
) | |
let internal__upon t ~f = | |
let monitor = Scheduler0.current_monitor () in | |
match !t with | |
| Full value -> Scheduler0.enqueue (fun () -> f value) monitor | |
| Empty { handlers } -> Queue.enqueue handlers (f, monitor) | |
end | |
module Deferred : sig | |
type 'a t | |
val return : 'a -> 'a t | |
val bind : 'a t -> f:('a -> 'b t) -> 'b t | |
val map : 'a t -> f:('a -> 'b) -> 'b t | |
module Let_syntax : sig | |
val return : 'a -> 'a t | |
module Let_syntax : sig | |
type nonrec 'a t = 'a t | |
val return : 'a -> 'a t | |
val bind : 'a t -> f:('a -> 'b t) -> 'b t | |
val map : 'a t -> f:('a -> 'b) -> 'b t | |
module Open_on_rhs : sig end | |
end | |
end | |
val internal__of_ivar : 'a Ivar.t -> 'a t | |
end = struct | |
type 'a t = 'a Ivar.t | |
let return v = | |
let t = Ivar.create () in | |
Ivar.fill t v; | |
t | |
let bind t ~f = | |
let r = Ivar.create () in | |
Ivar.internal__upon t ~f:(fun value -> | |
let t2 = f value in | |
Ivar.internal__upon t2 ~f:(fun value2 -> | |
Ivar.fill r value2 | |
) | |
); | |
r | |
let map t ~f = | |
bind t ~f:(fun v -> return (f v)) | |
module Let_syntax = struct | |
let return = return | |
module Let_syntax = struct | |
type nonrec 'a t = 'a t | |
let return = return | |
let bind = bind | |
let map = map | |
module Open_on_rhs = struct end | |
end | |
end | |
let internal__of_ivar t = t | |
end | |
module Scheduler : sig | |
include (module type of struct include Scheduler0 end) | |
val go : unit -> unit | |
val ready_to_read : Unix.File_descr.t -> unit Deferred.t | |
val ready_to_write : Unix.File_descr.t -> unit Deferred.t | |
end = struct | |
include Scheduler0 | |
module Epoll = Linux_ext.Epoll | |
let the_epoll = | |
let create = Or_error.ok_exn Epoll.create in | |
create ~num_file_descrs:1024 ~max_ready_events:128 | |
let fds_waiting_to_read = Unix.File_descr.Table.create () | |
let fds_waiting_to_write = Unix.File_descr.Table.create () | |
let update_epoll_flags fd = | |
let r = | |
match (Hashtbl.mem fds_waiting_to_read fd) with | |
| false -> Epoll.Flags.empty | |
| true -> Epoll.Flags.(in_ + hup) | |
in | |
let w = | |
match (Hashtbl.mem fds_waiting_to_write fd) with | |
| false -> Epoll.Flags.empty | |
| true -> Epoll.Flags.out | |
in | |
let desired = Epoll.Flags.(+) r w in | |
match Epoll.Flags.is_empty desired with | |
| true -> Epoll.remove the_epoll fd | |
| false -> Epoll.set the_epoll fd desired | |
let ready_to_read fd = | |
let r = Hashtbl.find_or_add fds_waiting_to_read fd ~default:Ivar.create in | |
update_epoll_flags fd; | |
Deferred.internal__of_ivar r | |
let ready_to_write fd = | |
let r = Hashtbl.find_or_add fds_waiting_to_write fd ~default:Ivar.create in | |
update_epoll_flags fd; | |
Deferred.internal__of_ivar r | |
let do_one_epoll () = | |
let timeout = | |
match internal__has_any_jobs () with | |
| true -> `Immediately | |
| false -> `Never | |
in | |
begin | |
match Epoll.wait the_epoll ~timeout with | |
| `Ok | `Timeout -> () | |
| exception (Unix.Unix_error (EINTR, _, _)) -> () | |
end; | |
Epoll.iter_ready the_epoll ~f:(fun fd flags -> | |
begin | |
match Epoll.Flags.do_intersect flags Epoll.Flags.(in_ + hup) with | |
| false -> () | |
| true -> | |
match Hashtbl.find_and_remove fds_waiting_to_read fd with | |
| None -> () | |
| Some ivar -> Ivar.fill ivar () | |
end; | |
begin | |
match Epoll.Flags.do_intersect flags Epoll.Flags.out with | |
| false -> () | |
| true -> | |
match Hashtbl.find_and_remove fds_waiting_to_write fd with | |
| None -> () | |
| Some ivar -> Ivar.fill ivar () | |
end; | |
update_epoll_flags fd | |
) | |
let go () = | |
while true; do | |
run_one_cycle (); | |
do_one_epoll () | |
done | |
end | |
module Http : sig | |
type page = | |
{ content_type : string | |
; payload : string | |
} | |
val run_server | |
: pages:page String.Map.t | |
-> port:int | |
-> unit Deferred.t | |
end = struct | |
open Deferred.Let_syntax | |
type page = | |
{ content_type : string | |
; payload : string | |
} | |
let request_regex = Or_error.ok_exn (Re2.create "^GET ([^\\n]*) HTTP/1.[01]\\r?\\n(.*\\r?\\n)*\\r?\\n") | |
let read_request sock = | |
let buf = Bytes.make 1024 '\000' in | |
let pos = ref 0 in | |
let rec loop () = | |
let%bind () = Scheduler.ready_to_read sock in | |
match Unix.read sock ~buf ~pos:!pos ~len:(1024 - !pos) with | |
| exception (Unix.Unix_error ((EINTR|EAGAIN|EWOULDBLOCK), _, _)) -> loop () | |
| bytes -> | |
pos := !pos + bytes; | |
let buf' = Bytes.unsafe_to_string ~no_mutation_while_string_reachable:buf in | |
match Re2.find_first request_regex buf' ~sub:(`Index 1) with | |
| Ok path -> return (Ok path) | |
| Error _ -> | |
match (bytes = 0, !pos = 1024) with | |
| (true, _) -> return (Or_error.error_string "eof") | |
| (_, true) -> return (Or_error.error_string "request too long") | |
| (false, false) -> loop () | |
in | |
loop () | |
let write_response sock payload = | |
let pos = ref 0 in | |
let buf = Bytes.unsafe_of_string_promise_no_mutation payload in | |
let rec loop () = | |
let%bind () = Scheduler.ready_to_write sock in | |
match Unix.write sock ~buf ~pos:!pos with | |
| exception (Unix.Unix_error ((EINTR|EAGAIN|EWOULDBLOCK), _, _)) -> loop () | |
| exception (Unix.Unix_error ((EPIPE|ECONNRESET), _, _)) -> return () | |
| bytes -> | |
pos := !pos + bytes; | |
match !pos = String.length payload with | |
| true -> return () | |
| false -> loop () | |
in | |
loop () | |
let handle_client ~pages sock = | |
let%bind path = read_request sock in | |
let%bind () = | |
match path with | |
| Error _ -> | |
write_response sock "HTTP/1.0 400 Bad Request\r\n\r\n" | |
| Ok path -> | |
match Map.find pages path with | |
| None -> | |
write_response sock "HTTP/1.0 404 Not Found\r\n\r\n" | |
| Some { content_type; payload } -> | |
let%bind () = | |
write_response sock | |
(sprintf !"HTTP/1.0 200 OK\r\nContent-type: %s\r\n\r\n" content_type) | |
in | |
write_response sock payload | |
in | |
Unix.close sock; | |
return () | |
let run_server ~pages ~port = | |
Signal.ignore Signal.pipe; | |
let server_sock = Unix.socket ~domain:PF_INET ~kind:SOCK_STREAM ~protocol:0 in | |
Unix.setsockopt server_sock SO_REUSEADDR true; | |
Unix.bind server_sock (ADDR_INET (Unix.Inet_addr.bind_any, port)); | |
Unix.listen server_sock ~backlog:10; | |
Unix.set_nonblock server_sock; | |
let rec loop () = | |
let%bind () = Scheduler.ready_to_read server_sock in | |
match Unix.accept server_sock with | |
| exception (Unix.Unix_error ((EINTR|EAGAIN|EWOULDBLOCK), _, _)) -> loop () | |
| (sock, _) -> | |
Unix.set_nonblock sock; | |
ignore (handle_client ~pages sock : unit Deferred.t); | |
loop () | |
in | |
loop () | |
end | |
let () = | |
let pages = | |
String.Map.of_alist_exn | |
[ ("/" | |
, { Http. | |
content_type = "text/html" | |
; payload = {| <a href="cat.jpg">Hello World</a> |} | |
} | |
) | |
; ("/cat.jpg" | |
, { content_type = "image/jpeg" | |
; payload = In_channel.read_all "cat.jpg" | |
} | |
) | |
] | |
in | |
ignore (Http.run_server ~pages ~port:8080 : unit Deferred.t); | |
Exn.handle_uncaught_and_exit Scheduler.go |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(*_ Intentionally empty. *) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(jbuild_version 1) | |
(executable ( | |
(name miniasync) | |
(libraries (core re2)) | |
(preprocess (pps (ppx_jane))) | |
)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment