Last active
April 23, 2020 00:19
-
-
Save fxfactorial/3bd019f46f0d1b64450235330defb54d to your computer and use it in GitHub Desktop.
Marshaling OCaml closures (functions), Unix Pipes, Lwt socket, Parent child forking
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(* EXAMPLE 1 *) | |
let () = | |
let parent_pid = Unix.getpid () in | |
Printf.sprintf "EXAMPLE 1: Parent pid:%d" parent_pid |> print_endline; | |
let some_func () = | |
Printf.sprintf "Process: %d" (Unix.getpid ()) | |
|> print_endline | |
in | |
let marshaled = Marshal.to_bytes some_func [Marshal.Closures] in | |
let (read, write) = Unix.pipe () in | |
let pid = Unix.fork () in | |
(* Child *) | |
if pid = 0 then begin | |
let in_chan = Unix.in_channel_of_descr read in | |
let func : unit -> unit = Marshal.from_channel in_chan in | |
func () | |
end | |
else begin | |
Unix.write write marshaled 0 (Bytes.length marshaled) |> ignore; | |
some_func (); | |
exit 0 | |
end | |
(* EXAMPLE 2 *) | |
open Lwt.Infix | |
let send fd msg = Lwt_unix.send fd msg 0 (Bytes.length msg) [] | |
let program = | |
let parent_pid = Unix.getpid () in | |
let some_func () = | |
Printf.sprintf "EXAMPLE 2: Process: %d" (Unix.getpid ()) | |
|> Lwt_io.printl >>= fun () -> Lwt_io.flush Lwt_io.stdout | |
in | |
let marshaled = Marshal.to_bytes some_func [Marshal.Closures] in | |
Printf.sprintf "Parent pid:%d" parent_pid | |
|> Lwt_io.printl >>= fun () -> | |
Lwt_io.flush Lwt_io.stdout >>= fun () -> | |
let (parents, childs) = | |
Lwt_unix.socketpair Lwt_unix.PF_UNIX Lwt_unix.SOCK_STREAM 0 | |
in | |
let pid = Lwt_unix.fork () in | |
if pid = 0 then begin | |
let buf = Bytes.create (Bytes.length marshaled) in | |
Lwt_unix.read childs buf 0 (Bytes.length marshaled) >>= fun r -> | |
let deserialized : unit -> unit Lwt.t = Marshal.from_bytes buf 0 in | |
deserialized () | |
(* Lwt_io.printlf "I'm child: %s" msg *) | |
end | |
else | |
send parents marshaled >|= fun r -> () | |
let () = Lwt_main.run program | |
(* EXAMPLE 3 *) | |
open Lwt.Infix | |
let send fd msg = Lwt_unix.send fd msg 0 (Bytes.length msg) [] | |
let core_count = Corecount.count () |> Nativeint.to_int | |
let program = | |
let parent_pid = Unix.getpid () in | |
let some_func () = | |
Printf.sprintf "EXAMPLE 3: Process: %d" (Unix.getpid ()) | |
|> Lwt_io.printl >>= fun () -> Lwt_io.flush Lwt_io.stdout | |
in | |
let marshaled = Marshal.to_bytes some_func [Marshal.Closures] in | |
Printf.sprintf "Parent pid:%d" parent_pid | |
|> Lwt_io.printl >>= fun () -> | |
Lwt_io.flush Lwt_io.stdout >>= fun () -> | |
let (parents, childs) = | |
Lwt_unix.socketpair Lwt_unix.PF_UNIX Lwt_unix.SOCK_STREAM 0 | |
in | |
let exception Iter_stop in | |
try%lwt | |
for%lwt i = 0 to core_count do | |
let pid = Lwt_unix.fork () in | |
if pid = 0 then begin | |
let buf = Bytes.create (Bytes.length marshaled) in | |
Lwt_unix.read childs buf 0 (Bytes.length marshaled) >>= fun r -> | |
let deserialized : unit -> unit Lwt.t = Marshal.from_bytes buf 0 in | |
deserialized () >>= fun () -> | |
Lwt.fail Iter_stop | |
end | |
else | |
send parents marshaled >|= fun r -> () | |
done | |
with Iter_stop -> | |
Lwt_io.printlf "Child continued separated" | |
let () = Lwt_main.run program |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment