Skip to content

Instantly share code, notes, and snippets.

@hcarty
Created April 20, 2012 14:03
Show Gist options
  • Save hcarty/2428922 to your computer and use it in GitHub Desktop.
Save hcarty/2428922 to your computer and use it in GitHub Desktop.
Lwt thread generation and consumption (now with zeromq)
<*.*>: package(lwt.unix), package(lwt.syntax), package(ZMQ), syntax(camlp4o)
(*
#camlp4o;;
#require "lwt.unix";;
#require "lwt.syntax";;
#require "ZMQ";;
Or compile with:
ocamlfind -use-ocamlfind target.native
*)
module Lwt_zmq = struct
module Socket = struct
type 'a t = {
socket : 'a ZMQ.Socket.t;
fd : Lwt_unix.file_descr;
}
let of_socket socket = {
socket;
fd = Lwt_unix.of_unix_file_descr ~blocking:false (ZMQ.Socket.get_fd socket);
}
let wrap event f s =
Lwt_unix.wrap_syscall event s.fd (
fun () ->
try
f s.socket
with
| ZMQ.ZMQ_exception (ZMQ.EAGAIN, _) -> raise Lwt_unix.Retry
| ZMQ.ZMQ_exception (ZMQ.EINTR, _) ->
prerr_endline "EINTR";
let rec retry () =
try
f s.socket
with
| ZMQ.ZMQ_exception (ZMQ.EINTR, _) ->
prerr_endline "EINTR again";
retry ()
| ZMQ.ZMQ_exception (ZMQ.EAGAIN, _) -> raise Lwt_unix.Retry
in
retry ()
)
let recv s =
wrap Lwt_unix.Read (fun s -> ZMQ.Socket.recv ~opt:ZMQ.Socket.R_no_block s) s
let send s m =
wrap Lwt_unix.Write (fun s -> ZMQ.Socket.send ~opt:ZMQ.Socket.S_no_block s m) s
end
end
module Z = struct
let rec req socket =
lwt () = Lwt_zmq.Socket.send socket "request" in
match_lwt Lwt_zmq.Socket.recv socket with
| "reply" -> req socket
| _ -> assert_lwt false
let rec rep socket =
lwt () =
match_lwt Lwt_zmq.Socket.recv socket with
| "request" -> Lwt.return ()
| _ -> assert_lwt false
in
lwt () = Lwt_zmq.Socket.send socket "reply" in
rep socket
end
let ( <?> ) = Lwt.( <?> )
let count = ref 0
type 'a thread_t =
| Create of 'a Lwt.t
| Result of 'a list * 'a Lwt.t list
let run threads =
lwt complete, remaining = Lwt.nchoose_split threads in
Lwt.return (Result (complete, remaining))
let make () =
let c = !count in
incr count;
lwt () = Lwt_unix.sleep 0.1 in
Lwt.return (
Create (
let command = Lwt_process.shell "sleep 0.5" in
lwt () =
Lwt_process.with_process_full command (
fun proc -> Lwt.return ()
)
in
Lwt_io.printlf "Done %d!" c
)
)
let rec main maker threads =
match_lwt maker <?> run threads with
| Create t ->
lwt () = Lwt_io.printl "Added a thread" in
main (make ()) (t :: threads)
| Result (results, remaining) ->
lwt () =
Lwt_io.printlf "%d threads complete, %d remain"
(List.length results)
(List.length remaining)
in
main maker remaining
let () =
let context = ZMQ.init () in
let req = ZMQ.Socket.create context ZMQ.Socket.req in
let rep = ZMQ.Socket.create context ZMQ.Socket.rep in
ZMQ.Socket.bind req "tcp://*:5555";
ZMQ.Socket.connect rep "tcp://127.0.0.1:5555";
Lwt_main.run (
main (make ()) [
Z.req (Lwt_zmq.Socket.of_socket req);
Z.rep (Lwt_zmq.Socket.of_socket rep);
]
);
ZMQ.Socket.close rep;
ZMQ.Socket.close req;
ZMQ.term context;
()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment