Skip to content

Instantly share code, notes, and snippets.

@hcarty
Created April 1, 2012 23:56
Show Gist options
  • Save hcarty/2279558 to your computer and use it in GitHub Desktop.
Save hcarty/2279558 to your computer and use it in GitHub Desktop.
Attempt at Lwt-enabled zeromq
module Lwt_socket = struct
type 'a t = {
socket : 'a ZMQ.Socket.t;
fd : Lwt_unix.file_descr;
}
let of_socket socket = {
socket;
fd = Lwt_unix.of_unix_file_descr ~blocking:false (ZMQ.Socket.get_fd socket);
}
let wrap event f s =
Lwt_unix.wrap_syscall event s.fd (
fun () ->
try
f s.socket
with
| ZMQ.ZMQ_exception (ZMQ.EAGAIN, _) -> raise Lwt_unix.Retry
)
let recv s =
wrap Lwt_unix.Read (fun s -> ZMQ.Socket.recv ~opt:ZMQ.Socket.R_no_block s) s
let send s m =
wrap Lwt_unix.Write (fun s -> ZMQ.Socket.send ~opt:ZMQ.Socket.S_no_block s m) s
end
module Lwt_socket : sig
(** An Lwt-wrapped zeromq socket *)
type 'a t
(** [of_socket s] wraps the zeromq socket [s] for use with Lwt *)
val of_socket : 'a ZMQ.Socket.t -> 'a t
(** [recv socket] waits for a message on [socket] without blocking other Lwt threads *)
val recv : 'a t -> string Lwt.t
(** [send socket] sends a message on [socket] without blocking other Lwt threads *)
val send : 'a t -> string -> unit Lwt.t
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment