Skip to content

Instantly share code, notes, and snippets.

@sgrove
Last active February 11, 2019 12:24
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sgrove/b2903ed1012f0270f156801f034c7d0c to your computer and use it in GitHub Desktop.
Save sgrove/b2903ed1012f0270f156801f034c7d0c to your computer and use it in GitHub Desktop.
Bad interaction between Async wrapper around ocurl and httpaf
[@@ocaml.doc " Async support for Curl, see https://github.com/ygrek/ocurl/blob/master/curl_lwt.ml "]
module M = Curl.Multi
type interrupt = unit Async.Ivar.t
type multi =
{
mt: Curl.Multi.mt ;
all_events: (Unix.file_descr, interrupt list) Hashtbl.t ;
wakeners: (Curl.t, Curl.curlCode Async.Ivar.t) Hashtbl.t }
let debug = false
let create () =
let mt = M.create () in
let timer_cancel = ref (Async.Ivar.create ()) in
let all_events = Hashtbl.create 32 in
let wakeners = Hashtbl.create 32 in
let fdMap = Hashtbl.create 32 in
let fdClosing = Hashtbl.create 32 in
let fdCreateAfterClose = Hashtbl.create 32 in
let fdState = Hashtbl.create 32 in
let finished _s =
let rec loop n =
match M.remove_finished mt with
| None -> ()
| ((Some ((h, code)))) ->
((try
let w = Hashtbl.find wakeners h in
Hashtbl.remove wakeners h; Async.Ivar.fill w code
with
| Not_found ->
Core.Log.error (("curl_async: orphan handle")));
loop (n + 1)) in
loop 0 in
let on_readable fd _ =
let (_ : int) = M.action mt fd M.EV_IN in
finished (("on_readable")) in
let on_writable fd _ =
let (_ : int) = M.action mt fd M.EV_OUT in
finished (("on_writable")) in
let on_timer _ =
M.action_timeout mt;
finished (("on_timer")) in
let rec repeating_timer ~interval ~interrupt ~f =
let open Async in
Deferred.don't_wait_for
(choose
[choice interrupt (fun () -> ());
choice (after (Core.Time.Span.of_ms (float_of_int interval)))
(fun () -> f (); repeating_timer ~interval ~interrupt ~f)]) in
let start_new_timer timeout =
let open Async in
let new_timer_cancel = Ivar.create () in
let old_timer_cancel = !timer_cancel in
timer_cancel := new_timer_cancel;
Ivar.fill old_timer_cancel ();
(match timeout with
| (-1) -> ()
| 0 -> on_timer ()
| interval ->
repeating_timer ~interval ~interrupt:(Ivar.read new_timer_cancel)
~f:on_timer) in
M.set_timer_function mt start_new_timer;
(let getFdId fd = Core.Unix.File_descr.to_int fd in
let logFdState msg fd =
if debug
then
let state =
match Hashtbl.find fdState fd with
| `Closed -> (("closed"))
| `Watching -> (("watching")) in
let inMap =
match Hashtbl.find_opt fdMap fd with
| Some _ -> true
| None -> false in
let closing =
match Hashtbl.find_opt fdClosing fd with
| Some _ -> true
| None -> false in
let creatingAfterClose =
match Hashtbl.find_opt fdCreateAfterClose fd with
| Some _ -> true
| None -> false in
Core.Log.infof
(("fd id=%d msg=%s state=%s, inMap=%b, closing=%b, creatingAfterClose=%b"))
(getFdId fd) msg state inMap closing creatingAfterClose in
let createAsyncFd fd =
match Hashtbl.find fdState fd with
| `Watching ->
(logFdState (("createAsyncFd `Watching")) fd;
((Ok
(((let open Async in
Unix.Fd.create ~avoid_nonblock_if_possible:false
((Unix.Fd.Kind.Socket (`Active))) fd
(Core.Info.of_string (("curl_async")))))))))
| `Closed ->
(logFdState (("createAsyncFd `Closed")) fd;
((Error (`Closed)))) in
let getAsyncFd fd =
let open Async in
match Hashtbl.find_opt fdMap fd with
| None ->
(match Hashtbl.find_opt fdCreateAfterClose fd with
| ((Some (deferredAsyncFdIvar))) ->
(logFdState (("waiting for createAfterClose")) fd;
Ivar.read deferredAsyncFdIvar)
| None ->
(logFdState (("creating asyncFd")) fd;
(let asyncFd = createAsyncFd fd in
Hashtbl.add fdMap fd asyncFd; Deferred.return asyncFd)))
| ((Some (asyncFd))) ->
(match Hashtbl.find_opt fdClosing fd with
| None ->
(logFdState (("returning found asyncFd")) fd;
Deferred.return asyncFd)
| ((Some (closingIvar))) ->
(logFdState (("waiting for close")) fd;
(match Hashtbl.find_opt fdCreateAfterClose fd with
| ((Some (deferredAsyncFdIvar))) ->
(logFdState (("found createAfterClose")) fd;
Ivar.read deferredAsyncFdIvar)
| None ->
(logFdState (("creating after close finishes")) fd;
(let deferredAsyncFdIvar = Ivar.create () in
Hashtbl.add fdCreateAfterClose fd deferredAsyncFdIvar;
upon (Ivar.read closingIvar)
(fun () ->
Hashtbl.remove fdCreateAfterClose fd;
(let asyncFd = createAsyncFd fd in
Hashtbl.add fdMap fd asyncFd;
Ivar.fill deferredAsyncFdIvar asyncFd));
Ivar.read deferredAsyncFdIvar))))) in
let closeFd unixFd =
let open Async in
match Hashtbl.find_opt fdMap unixFd with
| None ->
logFdState (("closeFd, not in map")) unixFd
| ((Some (asyncFd))) ->
(match Hashtbl.find_opt fdClosing unixFd with
| Some _ ->
logFdState (("closeFd, already closing")) unixFd
| None ->
(logFdState (("closeFd closing")) unixFd;
(let closingIvar = Ivar.create () in
Hashtbl.add fdClosing unixFd closingIvar;
(match asyncFd with
| Error _ ->
logFdState (("closeFd, unixFd is closing"))
unixFd
| ((Ok (asyncFd))) ->
let closingDeferred =
try
Unix.Fd.close
~file_descriptor_handling:Unix.Fd.Do_not_close_file_descriptor
asyncFd
with
| exn ->
(logFdState (("close failed")) unixFd;
Core.Log.errorf (("Error closing fd exn=%s"))
(Core.Exn.to_string exn);
raise exn) in
ignore
(closingDeferred >>|
(fun () ->
Hashtbl.remove fdMap unixFd;
Hashtbl.remove fdClosing unixFd;
Ivar.fill closingIvar ();
logFdState (("closeFd finished"))
unixFd)))))) in
let createSocketWatchEventHandler ~readWrite ~unixFd =
let open Async in
let interrupt = Ivar.create () in
Deferred.don't_wait_for
(match Hashtbl.find fdState unixFd with
| `Closed ->
(logFdState (("skip watching for closed fd")) unixFd;
Deferred.return ())
| `Watching ->
(getAsyncFd unixFd) >>=
((fun asyncFd ->
logFdState (("watch handler")) unixFd;
(match asyncFd with
| ((Error (`Closed))) ->
(logFdState (("skip watching for closed state (2)")) unixFd;
Deferred.return ())
| ((Ok (asyncFd))) ->
(logFdState (("watching")) unixFd;
(Unix.Fd.interruptible_every_ready_to asyncFd
readWrite ~interrupt:(Ivar.read interrupt)
(match readWrite with
| `Read -> on_readable unixFd
| `Write -> on_writable unixFd) ())
>>|
((fun res ->
match res with
| `Bad_fd ->
(Core.Log.error
(("got `Bad_fd adding async fd watcher in curl_async"));
logFdState (("`Bad_fd"))
unixFd)
| `Unsupported ->
(Core.Log.error
(("got `Unsupported adding async fd watcher in curl_async"));
logFdState (("`Unsupported"))
unixFd)
| `Closed ->
logFdState (("`Closed")) unixFd
| `Interrupted ->
logFdState (("`Interrupted"))
unixFd)))))));
interrupt in
let processSocketEvent unixFd what =
let open Async in
(match what with
| M.POLL_REMOVE -> Hashtbl.add fdState unixFd `Closed
| M.POLL_NONE -> ()
| M.POLL_IN|M.POLL_OUT|M.POLL_INOUT ->
Hashtbl.add fdState unixFd `Watching);
logFdState
(Printf.sprintf (("processSocketEvent %s"))
(match what with
| M.POLL_REMOVE -> (("remove"))
| M.POLL_NONE -> (("none"))
| M.POLL_IN -> (("in"))
| M.POLL_OUT -> (("out"))
| M.POLL_INOUT -> (("inout"))))
unixFd;
(match Hashtbl.find_opt all_events unixFd with
| ((Some (events))) ->
(Hashtbl.remove all_events unixFd;
List.iter (fun interrupt -> Ivar.fill interrupt ()) events)
| None -> ());
(let events =
match what with
| M.POLL_REMOVE -> (closeFd unixFd; [])
| M.POLL_NONE -> []
| M.POLL_IN ->
[createSocketWatchEventHandler ~readWrite:`Read ~unixFd]
| M.POLL_OUT ->
[createSocketWatchEventHandler ~readWrite:`Write ~unixFd]
| M.POLL_INOUT ->
[createSocketWatchEventHandler ~readWrite:`Read ~unixFd;
createSocketWatchEventHandler ~readWrite:`Write ~unixFd] in
match events with
| [] -> ()
| _ -> Hashtbl.add all_events unixFd events) in
M.set_socket_function mt processSocketEvent; { mt; all_events; wakeners })
let global = create ()
let perform h =
let t = global in
let wakener = Async.Ivar.create () in
Hashtbl.add t.wakeners h wakener; M.add t.mt h; Async.Ivar.read wakener
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment