Created
April 20, 2012 14:03
-
-
Save hcarty/2428922 to your computer and use it in GitHub Desktop.
Lwt thread generation and consumption (now with zeromq)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(* | |
#camlp4o;; | |
#require "lwt.unix";; | |
#require "lwt.syntax";; | |
#require "ZMQ";; | |
Or compile with: | |
ocamlfind -use-ocamlfind target.native | |
*) | |
module Lwt_zmq = struct | |
module Socket = struct | |
type 'a t = { | |
socket : 'a ZMQ.Socket.t; | |
fd : Lwt_unix.file_descr; | |
} | |
let of_socket socket = { | |
socket; | |
fd = Lwt_unix.of_unix_file_descr ~blocking:false (ZMQ.Socket.get_fd socket); | |
} | |
let wrap event f s = | |
Lwt_unix.wrap_syscall event s.fd ( | |
fun () -> | |
try | |
f s.socket | |
with | |
| ZMQ.ZMQ_exception (ZMQ.EAGAIN, _) -> raise Lwt_unix.Retry | |
| ZMQ.ZMQ_exception (ZMQ.EINTR, _) -> | |
prerr_endline "EINTR"; | |
let rec retry () = | |
try | |
f s.socket | |
with | |
| ZMQ.ZMQ_exception (ZMQ.EINTR, _) -> | |
prerr_endline "EINTR again"; | |
retry () | |
| ZMQ.ZMQ_exception (ZMQ.EAGAIN, _) -> raise Lwt_unix.Retry | |
in | |
retry () | |
) | |
let recv s = | |
wrap Lwt_unix.Read (fun s -> ZMQ.Socket.recv ~opt:ZMQ.Socket.R_no_block s) s | |
let send s m = | |
wrap Lwt_unix.Write (fun s -> ZMQ.Socket.send ~opt:ZMQ.Socket.S_no_block s m) s | |
end | |
end | |
module Z = struct | |
let rec req socket = | |
lwt () = Lwt_zmq.Socket.send socket "request" in | |
match_lwt Lwt_zmq.Socket.recv socket with | |
| "reply" -> req socket | |
| _ -> assert_lwt false | |
let rec rep socket = | |
lwt () = | |
match_lwt Lwt_zmq.Socket.recv socket with | |
| "request" -> Lwt.return () | |
| _ -> assert_lwt false | |
in | |
lwt () = Lwt_zmq.Socket.send socket "reply" in | |
rep socket | |
end | |
let ( <?> ) = Lwt.( <?> ) | |
let count = ref 0 | |
type 'a thread_t = | |
| Create of 'a Lwt.t | |
| Result of 'a list * 'a Lwt.t list | |
let run threads = | |
lwt complete, remaining = Lwt.nchoose_split threads in | |
Lwt.return (Result (complete, remaining)) | |
let make () = | |
let c = !count in | |
incr count; | |
lwt () = Lwt_unix.sleep 0.1 in | |
Lwt.return ( | |
Create ( | |
let command = Lwt_process.shell "sleep 0.5" in | |
lwt () = | |
Lwt_process.with_process_full command ( | |
fun proc -> Lwt.return () | |
) | |
in | |
Lwt_io.printlf "Done %d!" c | |
) | |
) | |
let rec main maker threads = | |
match_lwt maker <?> run threads with | |
| Create t -> | |
lwt () = Lwt_io.printl "Added a thread" in | |
main (make ()) (t :: threads) | |
| Result (results, remaining) -> | |
lwt () = | |
Lwt_io.printlf "%d threads complete, %d remain" | |
(List.length results) | |
(List.length remaining) | |
in | |
main maker remaining | |
let () = | |
let context = ZMQ.init () in | |
let req = ZMQ.Socket.create context ZMQ.Socket.req in | |
let rep = ZMQ.Socket.create context ZMQ.Socket.rep in | |
ZMQ.Socket.bind req "tcp://*:5555"; | |
ZMQ.Socket.connect rep "tcp://127.0.0.1:5555"; | |
Lwt_main.run ( | |
main (make ()) [ | |
Z.req (Lwt_zmq.Socket.of_socket req); | |
Z.rep (Lwt_zmq.Socket.of_socket rep); | |
] | |
); | |
ZMQ.Socket.close rep; | |
ZMQ.Socket.close req; | |
ZMQ.term context; | |
() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment