Skip to content

Instantly share code, notes, and snippets.

@danielrichman
Last active July 7, 2018 12:02
Show Gist options
  • Save danielrichman/b425c2d23ac5177d0d543e121cf247cf to your computer and use it in GitHub Desktop.
Save danielrichman/b425c2d23ac5177d0d543e121cf247cf to your computer and use it in GitHub Desktop.
baby async+epoll implementation, and proof-of-concept web server using it
open Core
module Monitor = struct
type t = unit
end
module Scheduler0 : sig
val enqueue
: (unit -> unit)
-> Monitor.t
-> unit
val run_one_cycle : unit -> unit
val current_monitor : unit -> Monitor.t
val internal__has_any_jobs : unit -> bool
end = struct
let current_monitor_ref = ref (() : Monitor.t) (* the "toplevel" monitor *)
let current_monitor () = !current_monitor_ref
type job =
{ thunk : unit -> unit
; monitor : Monitor.t
}
let job_queue : job Queue.t = Queue.create ()
let enqueue thunk monitor =
Queue.enqueue job_queue { thunk; monitor }
let run_one_cycle () =
let jobs = Queue.to_list job_queue in
Queue.clear job_queue;
List.iter jobs (fun { thunk; monitor } ->
current_monitor_ref := monitor;
match thunk () with
| () -> ()
| exception exn ->
(* Monitor.send_exn monitor exn *)
raise_s [%message "sending exns to monitors not implemented" (exn : exn)]
)
let internal__has_any_jobs () = not (Queue.is_empty job_queue)
end
module Ivar : sig
type 'a t
val create : unit -> 'a t
val fill : 'a t -> 'a -> unit
val internal__upon : 'a t -> f:('a -> unit) -> unit
end = struct
type 'a t' =
| Full of 'a
| Empty of { handlers : (('a -> unit) * Monitor.t) Queue.t }
type 'a t = 'a t' ref
let create () = ref (Empty { handlers = Queue.create () })
let fill t value =
match !t with
| Full _ -> failwith "fill called twice"
| Empty { handlers } ->
t := Full value;
Queue.iter handlers ~f:(fun (f, monitor) ->
Scheduler0.enqueue (fun () -> f value) monitor
)
let internal__upon t ~f =
let monitor = Scheduler0.current_monitor () in
match !t with
| Full value -> Scheduler0.enqueue (fun () -> f value) monitor
| Empty { handlers } -> Queue.enqueue handlers (f, monitor)
end
module Deferred : sig
type 'a t
val return : 'a -> 'a t
val bind : 'a t -> f:('a -> 'b t) -> 'b t
val map : 'a t -> f:('a -> 'b) -> 'b t
module Let_syntax : sig
val return : 'a -> 'a t
module Let_syntax : sig
type nonrec 'a t = 'a t
val return : 'a -> 'a t
val bind : 'a t -> f:('a -> 'b t) -> 'b t
val map : 'a t -> f:('a -> 'b) -> 'b t
module Open_on_rhs : sig end
end
end
val internal__of_ivar : 'a Ivar.t -> 'a t
end = struct
type 'a t = 'a Ivar.t
let return v =
let t = Ivar.create () in
Ivar.fill t v;
t
let bind t ~f =
let r = Ivar.create () in
Ivar.internal__upon t ~f:(fun value ->
let t2 = f value in
Ivar.internal__upon t2 ~f:(fun value2 ->
Ivar.fill r value2
)
);
r
let map t ~f =
bind t ~f:(fun v -> return (f v))
module Let_syntax = struct
let return = return
module Let_syntax = struct
type nonrec 'a t = 'a t
let return = return
let bind = bind
let map = map
module Open_on_rhs = struct end
end
end
let internal__of_ivar t = t
end
module Scheduler : sig
include (module type of struct include Scheduler0 end)
val go : unit -> unit
val ready_to_read : Unix.File_descr.t -> unit Deferred.t
val ready_to_write : Unix.File_descr.t -> unit Deferred.t
end = struct
include Scheduler0
module Epoll = Linux_ext.Epoll
let the_epoll =
let create = Or_error.ok_exn Epoll.create in
create ~num_file_descrs:1024 ~max_ready_events:128
let fds_waiting_to_read = Unix.File_descr.Table.create ()
let fds_waiting_to_write = Unix.File_descr.Table.create ()
let update_epoll_flags fd =
let r =
match (Hashtbl.mem fds_waiting_to_read fd) with
| false -> Epoll.Flags.empty
| true -> Epoll.Flags.(in_ + hup)
in
let w =
match (Hashtbl.mem fds_waiting_to_write fd) with
| false -> Epoll.Flags.empty
| true -> Epoll.Flags.out
in
let desired = Epoll.Flags.(+) r w in
match Epoll.Flags.is_empty desired with
| true -> Epoll.remove the_epoll fd
| false -> Epoll.set the_epoll fd desired
let ready_to_read fd =
let r = Hashtbl.find_or_add fds_waiting_to_read fd ~default:Ivar.create in
update_epoll_flags fd;
Deferred.internal__of_ivar r
let ready_to_write fd =
let r = Hashtbl.find_or_add fds_waiting_to_write fd ~default:Ivar.create in
update_epoll_flags fd;
Deferred.internal__of_ivar r
let do_one_epoll () =
let timeout =
match internal__has_any_jobs () with
| true -> `Immediately
| false -> `Never
in
begin
match Epoll.wait the_epoll ~timeout with
| `Ok | `Timeout -> ()
| exception (Unix.Unix_error (EINTR, _, _)) -> ()
end;
Epoll.iter_ready the_epoll ~f:(fun fd flags ->
begin
match Epoll.Flags.do_intersect flags Epoll.Flags.(in_ + hup) with
| false -> ()
| true ->
match Hashtbl.find_and_remove fds_waiting_to_read fd with
| None -> ()
| Some ivar -> Ivar.fill ivar ()
end;
begin
match Epoll.Flags.do_intersect flags Epoll.Flags.out with
| false -> ()
| true ->
match Hashtbl.find_and_remove fds_waiting_to_write fd with
| None -> ()
| Some ivar -> Ivar.fill ivar ()
end;
update_epoll_flags fd
)
let go () =
while true; do
run_one_cycle ();
do_one_epoll ()
done
end
module Http : sig
type page =
{ content_type : string
; payload : string
}
val run_server
: pages:page String.Map.t
-> port:int
-> unit Deferred.t
end = struct
open Deferred.Let_syntax
type page =
{ content_type : string
; payload : string
}
let request_regex = Or_error.ok_exn (Re2.create "^GET ([^\\n]*) HTTP/1.[01]\\r?\\n(.*\\r?\\n)*\\r?\\n")
let read_request sock =
let buf = Bytes.make 1024 '\000' in
let pos = ref 0 in
let rec loop () =
let%bind () = Scheduler.ready_to_read sock in
match Unix.read sock ~buf ~pos:!pos ~len:(1024 - !pos) with
| exception (Unix.Unix_error ((EINTR|EAGAIN|EWOULDBLOCK), _, _)) -> loop ()
| bytes ->
pos := !pos + bytes;
let buf' = Bytes.unsafe_to_string ~no_mutation_while_string_reachable:buf in
match Re2.find_first request_regex buf' ~sub:(`Index 1) with
| Ok path -> return (Ok path)
| Error _ ->
match (bytes = 0, !pos = 1024) with
| (true, _) -> return (Or_error.error_string "eof")
| (_, true) -> return (Or_error.error_string "request too long")
| (false, false) -> loop ()
in
loop ()
let write_response sock payload =
let pos = ref 0 in
let buf = Bytes.unsafe_of_string_promise_no_mutation payload in
let rec loop () =
let%bind () = Scheduler.ready_to_write sock in
match Unix.write sock ~buf ~pos:!pos with
| exception (Unix.Unix_error ((EINTR|EAGAIN|EWOULDBLOCK), _, _)) -> loop ()
| exception (Unix.Unix_error ((EPIPE|ECONNRESET), _, _)) -> return ()
| bytes ->
pos := !pos + bytes;
match !pos = String.length payload with
| true -> return ()
| false -> loop ()
in
loop ()
let handle_client ~pages sock =
let%bind path = read_request sock in
let%bind () =
match path with
| Error _ ->
write_response sock "HTTP/1.0 400 Bad Request\r\n\r\n"
| Ok path ->
match Map.find pages path with
| None ->
write_response sock "HTTP/1.0 404 Not Found\r\n\r\n"
| Some { content_type; payload } ->
let%bind () =
write_response sock
(sprintf !"HTTP/1.0 200 OK\r\nContent-type: %s\r\n\r\n" content_type)
in
write_response sock payload
in
Unix.close sock;
return ()
let run_server ~pages ~port =
Signal.ignore Signal.pipe;
let server_sock = Unix.socket ~domain:PF_INET ~kind:SOCK_STREAM ~protocol:0 in
Unix.setsockopt server_sock SO_REUSEADDR true;
Unix.bind server_sock (ADDR_INET (Unix.Inet_addr.bind_any, port));
Unix.listen server_sock ~backlog:10;
Unix.set_nonblock server_sock;
let rec loop () =
let%bind () = Scheduler.ready_to_read server_sock in
match Unix.accept server_sock with
| exception (Unix.Unix_error ((EINTR|EAGAIN|EWOULDBLOCK), _, _)) -> loop ()
| (sock, _) ->
Unix.set_nonblock sock;
ignore (handle_client ~pages sock : unit Deferred.t);
loop ()
in
loop ()
end
let () =
let pages =
String.Map.of_alist_exn
[ ("/"
, { Http.
content_type = "text/html"
; payload = {| <a href="cat.jpg">Hello World</a> |}
}
)
; ("/cat.jpg"
, { content_type = "image/jpeg"
; payload = In_channel.read_all "cat.jpg"
}
)
]
in
ignore (Http.run_server ~pages ~port:8080 : unit Deferred.t);
Exn.handle_uncaught_and_exit Scheduler.go
(*_ Intentionally empty. *)
(jbuild_version 1)
(executable (
(name miniasync)
(libraries (core re2))
(preprocess (pps (ppx_jane)))
))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment