Skip to content

Instantly share code, notes, and snippets.

@keigoi
Last active June 6, 2020 02:07
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save keigoi/3154282 to your computer and use it in GitHub Desktop.
session type in ocaml (proof-of-concept impl.)
channel.cmi :
mVar.cmi :
monitor.cmi :
session.cmi :
channel.cmo : monitor.cmi channel.cmi
channel.cmx : monitor.cmx channel.cmi
example.cmo : session.cmi
example.cmx : session.cmx
mVar.cmo : monitor.cmi mVar.cmi
mVar.cmx : monitor.cmx mVar.cmi
monitor.cmo : monitor.cmi
monitor.cmx : monitor.cmi
session.cmo : channel.cmi session.cmi
session.cmx : channel.cmx session.cmi
test.cmo : channel.cmi
test.cmx : channel.cmx
*.cm[ioxat]
*.cmxa
*.cmti
*.o
*.annot
test.*
*.top
*.byte
*.native
FLG -rectypes session.mli
B +threads
PKG threads unix
module Q = Queue
module M = Monitor
type 'a t = ('a Q.t ref) M.t
let create () : 'a t = M.create (ref (Q.create ()))
let send (t : 'a t) (v:'a) : unit =
M.lock t (fun q -> Q.add v !q; M.signal t)
let send_all (t : 'a t) (xs:'a list) : unit =
M.lock t (fun q -> List.iter (fun x -> Q.add x !q) xs; M.signal t)
let try_receive (t:'a t) : 'a option =
M.lock t (fun q -> if Q.is_empty !q then None else Some (Q.take !q))
let receive (t:'a t) : 'a =
M.wait t (fun q ->
if Q.is_empty !q then
M.WaitMore
else
M.Return (Q.take !q))
let clear_queue_ t =
M.lock t (fun q ->
let old = !q in
q := Q.create ();
old)
let receive_all (t:'a t) (func:'b -> 'a -> 'b) (init:'b) : 'b =
Q.fold func init (clear_queue_ t)
let receive_all_ (t:'a t) (func:'a -> unit) : unit =
receive_all t (fun _ x -> func x) ()
let peek (t:'a t) : 'a =
M.wait t (fun q ->
if Q.is_empty !q then
M.WaitMore
else
M.Return (Q.peek !q))
let clear (t:'a t) : unit =
ignore (clear_queue_ t)
let is_empty (t:'a t) : bool =
M.lock t (fun q -> Q.is_empty !q)
let length (t:'a t) : int =
M.lock t (fun q -> Q.length !q)
(** Concurrent, buffered channel *)
type 'a t
val create : unit -> 'a t
val send : 'a t -> 'a -> unit
val receive : 'a t -> 'a
val receive_all : 'a t -> ('b -> 'a -> 'b) -> 'b -> 'b
val receive_all_ : 'a t -> ('a -> unit) -> unit
val peek : 'a t -> 'a
val clear : 'a t -> unit
val is_empty : 'a t -> bool
val length : 'a t -> int
open Session
module Example1 = struct
open Session0
let neg_server () =
recv () >>= fun x ->
send (-x) >>
close ()
let neg_client () =
send 12345 >>
recv () >>= fun x ->
print_int x; (* -12345 を印字 *)
close ()
let neg_ch = new_channel ()
let _ =
ignore @@ Thread.create
(accept_ neg_ch neg_server) ();
connect_ neg_ch neg_client ()
end
module Example2 = struct
open Session0
open Example1
type binop = Add | Sub | Mul | Div
let eval_binop = function
| Add -> (+) | Sub -> (-)
| Mul -> ( * ) | Div -> (/)
let rec arith_server () =
_branch_start (function
| `neg(p),r -> _branch (p,r) (neg_server ())
| `bin(p),r -> _branch (p,r) (binop_server ())
| `fin(p),r -> _branch (p,r) (close ()): [`neg of 'p1 | `bin of 'p2 | `fin of 'p3] * 'a -> 'b)
and binop_server () =
recv () >>= fun op ->
recv () >>= fun (x,y) ->
send (eval_binop op x y) >>
arith_server ()
let arith_client () =
_select (fun x -> `bin(x)) >>
send Add >> send (150, 250) >>
recv () >>= fun ans ->
print_int ans;
_select (fun x -> `fin(x)) >>
close () >>
return ()
let arith_ch = new_channel ()
let _ =
ignore @@ Thread.create
(accept_ arith_ch arith_server) ();
connect_ arith_ch arith_client ()
end
module Example3 = struct
open SessionN
open Example1
open Example2
let rec main_thread dch () =
accept arith_ch ~bindto:_0 >>
connect dch ~bindto:_1 >>
deleg_send _1 ~release:_0 >>
close _1 >>=
main_thread dch
let rec worker_thread dch () =
accept dch ~bindto:_1 >>
deleg_recv _1 ~bindto:_0 >>
close _1 >>
arith_server () >>= (* _0 を使う *)
worker_thread dch
let _ =
let deleg_ch = new_channel () in
for i = 0 to 5 do
ignore @@ Thread.create
(run (worker_thread deleg_ch)) ()
done;
ignore @@ Thread.create (run (main_thread deleg_ch)) ();
ignore @@ Thread.create (Session0.connect_ arith_ch arith_client) ();
ignore @@ Thread.create (Session0.connect_ arith_ch arith_client) ();
Session0.connect_ arith_ch arith_client ();
Session0.connect_ arith_ch arith_client ();
Unix.sleep 1
end
module type LensTest = sig
val _0 : 'a * 'ss -> 'b -> ('b * 'a) * 'ss
val _1 : 's * ('a * 'ss) -> 'b -> 's * ('b * 'ss)
(* ... *)
end
(*
module type LinSession = sig
type (-'a, +'b) t
type 'a bang
val compose : ('a, 'b) t -> ('b, 'c) t -> ('a, 'c) t
val fst : ('a -> 'b) t -> ('a * 't, 'b * 't) t
val snd : ('a -> 'b) t -> ('t * 'a, 't * 'b) t
val connect : 'p channel -> (unit, ('p, cli) sess) t
val accept : 'p channel -> (unit, ('p, serv) sess) t
val bindto : 'v -> (unit, 'v) t
val out : ('a -> 'b) -> ('a bang,
val send : (([`msg * 'r1 * 'v * 'p], 'r1 * 'r2) sess * 'v bang, ('p, 'r1 * 'r2) sess) t
val recv : (([`msg * 'r2 * 'v * 'p], 'r1 * 'r2) sess, ('p, 'r1 * 'r2) sess * 'v bang) t
end
module type MonadAlt = sig
type (-'a, +'b) t
type ('a, 'b, 's, 't) lens
val fst : ('a, 'b) t -> ('a * 't, 'b * 't) t
val snd : ('a, 'b) t -> ('t * 'a, 't * 'b) t
val send : 'v -> ([`msg of 'r1 * 'v * 'p], 'p) t
val recv : ([`msg of 'r1 * 'v * 'p], 'p) t
val lift : ('a,'b,'ss,'tt) lens -> ('a, 'b, 'c) t -> ('ss, 'tt, 'c) t
end
module type MonadAlt = sig
type (-'a, +'b, +'c) t
val (>>) : ('a, 'b, unit) t -> ('b, 'c, 'w) t -> ('a, 'c, 'w) t
val (>>=) : ('a, 'b, 'v) t -> ('v -> ('b, 'c, 'w) t) -> ('a, 'c, 'w) t
val return : 'v -> ('a, 'a, 'v) t
val fst : ('a, 'b, 'c) t -> ('a * 't, 'b * 't, 'c) t
val snd : ('a, 'b, 'c) t -> ('t * 'a, 't * 'b, 'c) t
val send : 'v -> ([`msg of 'r1 * 'v * 'p], 'p, unit) t
val recv : unit -> ([`msg of 'r1 * 'v * 'p], 'p, 'v) t
end
module M(X:MonadAlt) = struct
open X
let _0 = fst
let _ = _0 (send 100) >> _1 (recv ()) >>
end
module type TcpSessionS = sig
type 'p net = Tcp.channel -> (('p, serv) sess * all_empty, all_empty, unit) session
val req : ('v -> string) -> 'p part -> [`msg of req * 'v * 'p] part
val resp : (string -> 'v) -> 'p part -> [`msg of resp * 'v * 'p] part
val sel :
: (('v1 -> string) * 'p1 part)
-> (('v2 -> string) * 'p2 part)
-> [`branch of req * [`left * 'p1 | `right * 'p2]] part
val bra :
: ((string -> 'v1 option) * 'p1 part)
-> ((string -> 'v2 option) * 'p2 part)
-> [`branch of resp * [`left * [`msg of resp * 'v1 * 'p1]
|`right * [`msg of resp * 'v2 * 'p2]]] part
val cls : [`close] part
end
module TcpSession : TcpSessionS = struct
type 'p net = Tcp.channel -> (('p, serv) sess * all_empty, all_empty, unit) session
open Session0
let req conv cont tcp =
recv () >>= fun v ->
Tcp.send_line tcp (conv v);
cont tcp
let resp conv cont tcp =
let line = Tcp.recv_line tcp in
send (conv line) >>= fun () ->
cont tcp
let sel cont1 cont2 tcp =
branch2
(fun () -> cont1 tcp)
(fun () -> cont2 tcp)
let bra (conv1,cont1) cont2 tcp =
let line = Tcp.recv_line tcp in
match conv1 line with
| Some(v) -> select_left () >> send v >>= fun () -> cont1 tcp
| None -> cont2
let cls tcp = Tcp.close tcp; close ()
let channel f opts =
let ch = new_channel () in
Thread.create (fun () ->
accept_ ch (fun () ->
let tcp = Tcp.connect opts in
f tcp)) ();
ch
end
type ehlo = EHLO of string
type mail = MAIL of string
type rcpt = RCPT of string
type data = DATA
type quit = QUIT
type r200 = R200 of string list
type r500 = R500 of string list
type R354 = R354 of string
module SMTP = struct
open TcpSession
let rec rcpt_part cont x =
begin
select (send rcpt @@ branch (r200, rcpt_part cont) (r500, send quit close))
cont
end @@ x
let rec mail_loop x =
begin
send mail @@ recv r200 @@
rcpt_part @@
send data @@ recv r354 @@
send string_list @@ recv r200 @@
select (send quit close)
mail_loop
end @@ x
let smtp_protcol =
recv r200 (send ehlo (recv r200, mail_loop ()))
end
let ch = TcpSession.channel SMTP.smtp_protocol opts
;;
connect_ch (fun () ->
send (EHLO("keigoimai.info")) >> recv () >>= fun (R200 str) ->
send (MAIL("keigo.imai@gmail.com")) >> recv () >>= fun (R200 str) ->
send (RCPT("keigoi@gifu-u.ac.jp")) >>
branch (fun () ->
recv () >>= fun (R200 str) ->
send DATA >> recv () >>= fun R354 ->
send (escape mailbody) >> recv () >>= fun (R200 str) ->
send QUIT >>
close ())
(fun () ->
recv () >>= fun (R500 str) ->
List.iter print_endline str;
send QUIT >>
close ())) ()
*)
(* newtype EHLO = EHLO String; newtype MAIL = MAIL String; newtype RCPT = RCPT String *)
(* data DATA = DATA; data QUIT = QUIT; *)
(* -- Types for SMTP server replies (200 OK, 500 error and 354 start mail input) *)
(* newtype R2yz = R2yz [String]; newtype R5yz = R5yz [String]; newtype R354 = R354 String *)
(* module type SerialSession = sig *)
(* type t *)
(* val run : Tcp.channel -> ((t, serv) sess * all_empty, empty, unit) session *)
(* end *)
(* module type Printer = sig *)
(* type v *)
(* val print : v -> string *)
(* end *)
(* module type Parser = sig *)
(* type v *)
(* val parse : string -> v option *)
(* end *)
(* module Send(V:Printer, P:SerialSession) : SerialSession *)
(* with type t = [`msg of req * V.t * P.t] = *)
(* struct *)
(* type t = [`msg of req * V.t * P.t] *)
(* let run c = *)
(* Session.recv () >>= fun x -> *)
(* Tcp.send c (V.print x); *)
(* P.run c *)
(* end;; *)
(* module Recv(V:Parser, P:SerialSession) : SerialSession *)
(* with type t = [`msg of resp * V.t * P.t] = *)
(* struct *)
(* type t = [`msg of req * V.t * P.t] *)
(* let run c = *)
(* let str = Tcp.recv c in *)
(* match V.parse str with *)
(* | Some(x) -> *)
(* Session.send (V.parse str) >> *)
(* P.run c *)
(* | None -> failwith "network parse error" *)
(* end;; *)
(* module Branch(V1:Parser, P1:SerialSession, V2:Parser, P2:SerialSession) : SerialSession *)
(* with type t = [`branch of resp * [`left * [`msg of resp * V1.t * P1.t] *)
(* | `right * [`msg of resp * V2.t * P2.t]]] = *)
(* struct *)
(* type t = [`branch of resp * [`left * [`msg of resp * V1.t * P1.t] *)
(* | `right * [`msg of resp * V2.t * P2.t]]] *)
(* let run c = *)
(* let str = Tcp.recv c in *)
(* match parse str with *)
(* | Some(x) -> *)
(* Session.select_left () >> *)
(* Session.send x >> *)
(* P1.run c *)
(* | None -> *)
(* end;; *)
(* module Example4 = struct *)
(* open SafeSession.Session0 *)
(* let rec smtpclient ch () = *)
(* _branch_start (function *)
(* | `EHLO(p,server) -> *)
(* end *)
(* _branch_start (function *)
(* | `neg(p),r -> _branch (p,r) (neg_server ()) *)
(* | `bin(p),r -> _branch (p,r) (binop_server ()) *)
(* | `fin(p),r -> _branch (p,r) (close ()): [`neg of 'p1 | `bin of 'p2 | `fin of 'p3] * 'a -> 'b) *)
OCAMLC=ocamlfind ocamlc -rectypes -thread -package unix,threads
OCAMLOPT=ocamlfind ocamlopt -rectypes -thread -package unix,threads
OCAMLMKTOP=ocamlfind ocamlmktop -rectypes -thread -package unix,threads
OCAMLDEP=ocamlfind ocamldep
INCLUDES= # all relevant -I options here
OCAMLFLAGS=$(INCLUDES) # add other options for ocamlc here
OCAMLOPTFLAGS=$(INCLUDES) # add other options for ocamlopt here
BYTE_OBJS=monitor.cmo mVar.cmo channel.cmo session.cmo example.cmo
CMI=$(BYTE_OBJS:%.cmo=%.cmi)
NATIVE_OBJS=$(BYTE_OBJS:%.cmo=%.cmx)
all: test.byte
test.native: $(NATIVE_OBJS) $(CMI)
$(OCAMLOPT) -linkpkg -o test.native $(OCAMLFLAGS) $(NATIVE_OBJS)
test.byte: $(BYTE_OBJS) $(CMI)
$(OCAMLC) -linkpkg -o test.byte $(OCAMLFLAGS) $(BYTE_OBJS)
test.top: $(BYTE_OBJS) $(CMI)
$(OCAMLMKTOP) -linkpkg -o test.top $(OCAMLFLAGS) $(BYTE_OBJS)
example.byte: $(BYTE_OBJS) $(CMI)
$(OCAMLC) -linkpkg -o $@ $(OCAMLFLAGS) $(BYTE_OBJS)
example.top: $(BYTE_OBJS) $(CMI)
$(OCAMLMKTOP) -linkpkg -o example.top $(OCAMLFLAGS) $(BYTE_OBJS)
# Common rules
.SUFFIXES: .ml .mli .cmo .cmi .cmx
.ml.cmo:
$(OCAMLC) $(OCAMLFLAGS) -c $<
.mli.cmi:
$(OCAMLC) $(OCAMLFLAGS) -c $<
.ml.cmx:
$(OCAMLOPT) $(OCAMLOPTFLAGS) -c $<
# Clean up
clean:
rm -f test.byte test.native
rm -f *.cm[ioaxt] *.cmax *.cmti *.o *.annot
# Dependencies
depend:
$(OCAMLDEP) $(INCLUDES) *.mli *.ml > .depend
include .depend
module M = Mutex
module C = Condition
type 'a t = ('a * M.t * C.t)
type 'a wait = WaitMore | Return of 'a
let create v = (v, M.create (), C.create ())
let wait (cell,m,c) (func: 'a -> 'b wait) : 'b =
Mutex.lock m;
let rec loop () =
begin try
match func cell with
| Return v -> v
| WaitMore -> begin
Condition.wait c m;
loop ()
end
with e ->
Mutex.unlock m;
raise e
end
in
let v = loop () in
Mutex.unlock m;
v
let signal (_,_,c) = Condition.signal c
let lock (cell,m,_) (func: 'a -> 'b) : 'b =
Mutex.lock m;
begin try
let v = func cell in
Mutex.unlock m;
v
with e ->
Mutex.unlock m;
raise e
end
let try_lock (cell,m,_) (func: 'a -> 'b) (iffail:'b) : 'b =
if Mutex.try_lock m then begin
begin try
let v = func cell in
Mutex.unlock m;
v
with e ->
Mutex.unlock m;
raise e
end
end else iffail
let get (cell,_,_) = cell
(** Monitor *)
type 'a t
type 'b wait = WaitMore | Return of 'b
val create : 'a -> 'a t
val signal : 'a t -> unit
val wait : 'a t -> ('a -> 'b wait) -> 'b
val lock : 'a t -> ('a -> 'b) -> 'b
val try_lock : 'a t -> ('a -> 'b) -> 'b -> 'b
val get : 'a t -> 'a
(* slots and lenses *)
type empty = Empty
type all_empty = empty * 'a as 'a
type ('a,'b,'ss,'tt) slot = ('ss -> 'a) * ('ss -> 'b -> 'tt)
let _0 = (fun (a,_) -> a), (fun (_,ss) b -> (b,ss))
let _1 = (fun (_,(a,_)) -> a), (fun (s0,(_,ss)) b -> (s0,(b,ss)))
let _2 = (fun (_,(_,(a,_))) -> a), (fun (s0,(s1,(_,ss))) b -> (s0,(s1,(b,ss))))
let _3 = (fun (_,(_,(_,(a,_)))) -> a), (fun (s0,(s1,(s2,(_,ss)))) b -> (s0,(s1,(s2,(b,ss)))))
let rec all_empty = Empty, all_empty
let run f x = snd (f x all_empty)
(* monads *)
type ('ss,'tt,'v) monad = 'ss -> 'tt * 'v
let return x p = p, x
let (>>=) m f p = let q,v = m p in f v q
let (>>) m n p = let q,_ = m p in n q
(* polarized session types *)
type req = Req
type resp = Resp
type cli = resp * req
type serv = req * resp
type 'p cont =
Msg : ('v * 'p cont Channel.t) -> [`msg of 'r * 'v * 'p] cont
| Branch : 'br -> [`branch of 'r * 'br] cont
| Chan : (('pp, 'rr) sess * 'p cont Channel.t) -> [`deleg of 'r * ('pp, 'rr) sess * 'p] cont
and ('p, 'r) sess = 'p cont Channel.t * 'r
(* service channels *)
type 'p channel = 'p cont Channel.t Channel.t
let new_channel = Channel.create
module SessionN = struct
let new_channel = Channel.create
let close (get,set) ss = set ss Empty, ()
let send (get,set) v ss =
let ch,q = get ss and ch' = Channel.create () in
Channel.send ch (Msg(v,ch')); set ss (ch',q), ()
let recv (get,set) ss =
let (ch,q) = get ss in let Msg(v,ch') = Channel.receive ch in
set ss (ch',q), v
let deleg_send (get0,set0) ~release:(get1,set1) ss =
let ch0,q1 = get0 ss and ch0' = Channel.create () in
let tt = set0 ss (ch0',q1) in
let ch1,q2 = get1 tt in
Channel.send ch0 (Chan((ch1,q2),ch0'));
set1 tt Empty, ()
let deleg_recv (get0,set0) ~bindto:(get1,set1) ss =
let ch0,q0 = get0 ss in
let Chan((ch1',q1),ch0') = Channel.receive ch0 in
let tt = set0 ss (ch0',q0) in
set1 tt (ch1',q1), ()
let accept ch ~bindto:(_,set) ss =
let ch' = Channel.receive ch in set ss (ch',(Req,Resp)), ()
let connect ch ~bindto:(_,set) ss =
let ch' = Channel.create () in Channel.send ch ch'; set ss (ch',(Resp,Req)), ()
let inp : 'p 'r. 'p -> 'r -> ('p,'r) sess = fun x _ -> Obj.magic x
let out : 'p 'r. ('p,'r) sess -> 'p = Obj.magic
let _branch_start : type br.
(([`branch of 'r1 * br], 'r1*'r2) sess, 'x, 'ss, 'xx) slot
-> (br * ('r1*'r2) -> ('ss, 'uu,'v) monad)
-> ('ss, 'uu, 'v) monad
= fun (get0,set0) f ss ->
let (ch,p) = get0 ss in
match Channel.receive ch with
| Branch(br) -> f (br,p) ss
let _branch :
(([`branch of 'r1 * 'br], 'r1*'r2) sess, ('p,'r1*'r2) sess, 'ss, 'tt1) slot
-> 'p * ('r1*'r2)
-> ('tt1,'uu,'v) monad
-> ('ss, 'uu, 'v) monad
= fun (_,set1) (c,p) m ss ->
let tt1 = set1 ss (inp c p)
in m tt1
let _select : type br p.
(([`branch of 'r2 * br],'r1*'r2) sess, (p,'r1*'r2) sess, 'ss, 'tt) slot
-> (p -> br)
-> ('ss, 'tt, 'v) monad = fun (get0,set0) f ss ->
let ch,p = get0 ss in
let k = Channel.create () in
Channel.send ch (Branch(f (out (k,p))));
set0 ss (k,p), ()
let branch2 (s1,f1) (s2,f2) =
_branch_start s1 (function
| `left(p1),q -> _branch s1 (p1,q) (f1 ())
| `right(p2),q -> _branch s2 (p2,q) (f2 ())
: [`left of 'p1 | `right of 'p2] * 'x -> 'y)
let select_left s = _select s (fun x -> `left(x))
let select_right s = _select s (fun x -> `right(x))
end
module Session0 = struct
let accept_ ch f v = run (fun v -> SessionN.accept ch ~bindto:_0 >> f v) v
let connect_ ch f v = run (fun v -> SessionN.connect ch ~bindto:_0 >> f v) v
let close () = SessionN.close _0
let recv () = SessionN.recv _0
let send v = SessionN.send _0 v
let branch2 = fun f g -> SessionN.branch2 (_0,f) (_0,g)
let select_left () = SessionN.select_left _0
let select_right () = SessionN.select_right _0
let _select f = SessionN._select _0 f
let _branch_start f = SessionN._branch_start _0 f
let _branch wit m = SessionN._branch _0 wit m
end
(* empty slots *)
type empty
type all_empty = empty * 'a as 'a
(* lenses on slots *)
type ('a,'b,'ss,'tt) slot
val _0 : ('a, 'b, ('a * 'ss), ('b * 'ss)) slot
val _1 : ('a, 'b, ('s0 * ('a * 'ss)), ('s0 * ('b * 'ss))) slot
val _2 : ('a, 'b, ('s0 * ('s1 * ('a * 'ss))), ('s0 * ('s1 * ('b * 'ss)))) slot
val _3 : ('a, 'b, ('s0 * ('s1 * ('s2 * ('a * 'ss)))), ('s0 * ('s1 * ('s2 * ('b * 'ss))))) slot
(* parameterized monads *)
type ('x,'y,'a) monad
val return : 'v -> ('x,'x,'v) monad
val (>>=) : ('x,'y,'a) monad -> ('a -> ('y, 'z, 'b) monad) -> ('x,'z,'b) monad
val (>>) : ('x,'y,'a) monad -> ('y,'z,'b) monad -> ('x,'z,'b) monad
val run : ('a -> (all_empty, all_empty, 'b) monad) -> 'a -> 'b
(* channels *)
type 'p channel
val new_channel : unit -> 'p channel
(* polarized session types *)
type req and resp
type cli = resp * req
type serv = req * resp
type ('p, 'q) sess
module Session0 : sig
val accept_ : 'p channel -> ('v -> (('p,serv) sess * all_empty, all_empty, 'w) monad) -> 'v -> 'w
val connect_ : 'p channel -> ('v -> (('p,cli) sess * all_empty, all_empty, 'w) monad) -> 'v -> 'w
val close : unit -> (([`close], 'r1*'r2) sess * 'ss, empty * 'ss, unit) monad
val send : 'v -> (([`msg of 'r2 * 'v * 'p], 'r1*'r2) sess * 'ss, ('p, 'r1*'r2) sess * 'ss, unit) monad
val recv : unit -> (([`msg of 'r1 * 'v * 'p], 'r1*'r2) sess * 'ss, ('p, 'r1*'r2) sess * 'ss, 'v) monad
val branch2
: (unit -> (('p1, 'r1*'r2) sess * 'ss, 'uu, 'a) monad)
-> (unit -> (('p2, 'r1*'r2) sess * 'ss, 'uu, 'a) monad)
-> (([`branch of 'r1 * [`left of 'p1 | `right of 'p2]], 'r1*'r2) sess * 'ss, 'uu, 'a) monad
val select_left
: unit -> (([`branch of 'r2 * [>`left of 's1]], 'r1*'r2) sess * 'ss, ('s1, 'r1*'r2) sess * 'ss, unit) monad
val select_right
: unit -> (([`branch of 'r2 * [>`right of 's2]], 'r1*'r2) sess * 'ss, ('s2, 'r1*'r2) sess * 'ss, unit) monad
val _select
: ('p -> 'br)
-> (([`branch of 'r2 * 'br],'r1*'r2) sess * 'ss, ('p,'r1*'r2) sess * 'ss, unit) monad
val _branch_start
: ('br * ('r1*'r2) -> (([`branch of 'r1 * 'br], 'r1*'r2) sess * 'ss, 'uu,'v) monad)
-> (([`branch of 'r1 * 'br], 'r1*'r2) sess * 'ss, 'uu, 'v) monad
val _branch
: 'p * ('r1*'r2)
-> (('p,'r1*'r2) sess * 'ss, 'uu, 'v) monad
-> (([`branch of 'r1 * 'br], 'r1*'r2) sess * 'ss, 'uu, 'v) monad
end
module SessionN : sig
val accept : 'p channel -> bindto:(empty, ('p,serv) sess, 'ss, 'tt) slot -> ('ss, 'tt, unit) monad
val connect : 'p channel -> bindto:(empty, ('p,cli) sess, 'ss, 'tt) slot -> ('ss, 'tt, unit) monad
val close : (([`close], 'r1*'r2) sess, empty, 'ss, 'tt) slot -> ('ss, 'tt, unit) monad
val send : (([`msg of 'r2 * 'v * 'p], 'r1*'r2) sess, ('p, 'r1*'r2) sess, 'ss, 'tt) slot -> 'v -> ('ss, 'tt, unit) monad
val recv : (([`msg of 'r1 * 'v * 'p], 'r1*'r2) sess, ('p, 'r1*'r2) sess, 'ss, 'tt) slot -> ('ss, 'tt, 'v) monad
val branch2
: (([`branch of 'r1 * [`left of 'p1 | `right of 'p2]], 'r1*'r2) sess, ('p1, 'r1*'r2) sess, 'ss, 'tt1) slot * (unit -> ('tt1, 'uu, 'a) monad)
-> (([`branch of 'r1 * [`left of 'p1 | `right of 'p2]], 'r1*'r2) sess, ('p2, 'r1*'r2) sess, 'ss, 'tt2) slot * (unit -> ('tt2, 'uu, 'a) monad)
-> ('ss, 'uu, 'a) monad
val select_left
: (([`branch of 'r2 * [>`left of 's1]], 'r1*'r2) sess,
('s1, 'r1*'r2) sess, 'ss, 'tt) slot
-> ('ss, 'tt, unit) monad
val select_right
: (([`branch of 'r2 * [>`right of 's2]], 'r1*'r2) sess,
('s2, 'r1*'r2) sess, 'ss, 'tt) slot
-> ('ss, 'tt, unit) monad
val _select
: (([`branch of 'r2 * 'br],'r1*'r2) sess, ('p,'r1*'r2) sess, 'ss, 'tt) slot
-> ('p -> 'br)
-> ('ss, 'tt, unit) monad
val _branch_start
: (([`branch of 'r1 * 'br], 'r1*'r2) sess, 'x, 'ss, 'xx) slot
-> ('br * ('r1*'r2) -> ('ss, 'uu,'v) monad)
-> ('ss, 'uu, 'v) monad
val _branch
: (([`branch of 'r1 * 'br], 'r1*'r2) sess, ('p,'r1*'r2) sess, 'ss, 'tt1) slot
-> 'p * ('r1*'r2)
-> ('tt1, 'uu, 'v) monad
-> ('ss, 'uu, 'v) monad
val deleg_send
: (([`deleg of 'r2 * ('pp, 'rr) sess * 'p], 'r1*'r2) sess, ('p, 'r1*'r2) sess, 'ss, 'tt) slot
-> release:(('pp, 'rr) sess, empty, 'tt, 'uu) slot
-> ('ss, 'uu, unit) monad
val deleg_recv
: (([`deleg of 'r1 * ('pp, 'rr) sess * 'p], 'r1*'r2) sess, ('p, 'r1*'r2) sess, 'ss, 'tt) slot
-> bindto:(empty, ('pp, 'rr) sess, 'tt, 'uu) slot
-> ('ss, 'uu, unit) monad
end
let race_check =
let mutex = Mutex.create () in
fun () ->
if not (Mutex.try_lock mutex) then begin
failwith "race detected!"
end;
Thread.delay 0.001;
Mutex.unlock mutex
(* Test for channels *)
module C = Channel;;
(* send n to 1 of type int on channel c *)
let sender c put_ n =
let put_ = put_ () in
let ok = ref false in
ignore (Thread.create (fun _ ->
Thread.delay 0.001;
print_string "s "; flush stderr;
for i=n downto 1 do
Thread.yield ();
put_ c i
done;
ok := true) ());
ok
(* receive and sum all ints until finish () = true *)
let receiver c finish get_ () =
let get_ = get_ () in
let ok = ref None in
ignore (Thread.create (fun _ ->
Thread.delay 0.001;
print_string "r "; flush stderr;
let sum = ref 0 in
let rec loop () =
Thread.yield (); (* explicit yield is needed since OCaml's thread lacks fairness on this case *)
sum := get_ c + !sum;
if not (finish ()) || not (C.is_empty c) then
loop ()
else
ok := Some (!sum)
in loop ()) ());
ok
let rec loop n f x =
match n with
| 0 -> []
| n -> f x :: loop (n-1) f x
let test_chan put_ get_ =
let c = C.create () in
let amount = 1000 and threads = 100
in
(* start sender threads *)
let senders = loop threads (sender c put_) amount in
let senders_finished () = List.for_all (fun c -> !c) senders
in
(* start receiver threads *)
let receivers = loop threads (receiver c senders_finished get_) () in
let receivers_finished () = List.for_all (fun c -> !c<>None) receivers
in
(* wait until all receives finishes *)
while not (receivers_finished ()) do
Thread.delay 0.;
done;
let get x =
match !x with
| None -> failwith "impossible"
| Some v -> print_string (string_of_int v^" "); v
in
(* sum all received values *)
let sum = List.fold_left (fun x r -> x + (get r)) 0 receivers in
(* check it *)
print_endline (string_of_int sum);
assert ((amount*(amount+1))/2 * threads = sum)
let _ =
let cnt = 100000 in
let c = C.create () in
for i=0 to cnt-1 do
ignore (Thread.create (fun _ -> Thread.delay 0.001; C.send c (Random.bits ())) ());
let v = C.peek c and w = C.receive c in
assert (v = w)
done;
print_endline "Channel.peek/receive OK.";
for i=0 to cnt-1 do
let len = Random.bits () mod 100 + 1 in
for n=0 to len-1 do
C.send c n;
done;
assert (C.length c = len);
assert (not (C.is_empty c));
C.clear c;
assert (C.is_empty c);
done;
print_endline "Channel.length/is_empty OK.";
()
let _ =
let receive_all () c = C.receive_all c (+) 0
and try_receive () c =
match C.try_receive c with
| None -> 0
| Some v -> v
and receive_all_ () =
let buf = ref 0 in
fun c ->
C.receive_all_ c (fun n -> buf := n + !buf);
let v = !buf in
buf := 0;
v
and send () c n = C.send c n
and send_all () =
let buf = ref [] in
fun c n ->
buf := n :: !buf;
if List.length !buf=10 || n = 1 then begin
C.send_all c (List.rev !buf);
buf := []
end
in
test_chan send receive_all;
print_endline "Channel.send/receive_all OK.";
test_chan send_all receive_all;
print_endline "Channel.send/receive_all OK.";
test_chan send receive_all_;
print_endline "Channel.send/receive_all_ OK.";
test_chan send_all receive_all_;
print_endline "Channel.send/receive_all_ OK.";
test_chan send try_receive;
print_endline "Channel.send/try_receive OK.";
test_chan send_all try_receive;
print_endline "Channel.send/try_receive OK.";
()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment