Skip to content

Instantly share code, notes, and snippets.

@arbipher
Created January 4, 2024 23:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arbipher/e48d3cddac84f24cfdf15076d31154d3 to your computer and use it in GitHub Desktop.
Save arbipher/e48d3cddac84f24cfdf15076d31154d3 to your computer and use it in GitHub Desktop.
Tests for cartesian product of two `Lwt_stream`s
open Core
module Test_stream = struct
let two_streams_from_list () =
let msgs1 = List.init 3 ~f:Fn.id in
let msgs2 = List.init 3 ~f:Fn.id in
let s1 = Lwt_stream.of_list msgs1 in
let s2 = Lwt_stream.of_list msgs2 in
let s12 = product_stream s1 s2 in
let ans = Lwt_stream.get_available s12 in
Alcotest.(check @@ list (pair int int))
"equal"
[ (0, 0); (1, 0); (2, 0); (0, 1); (1, 1); (2, 1); (0, 2); (1, 2); (2, 2) ]
ans
let one_stepped_one_list () =
let msgs1 = List.init 3 ~f:Fn.id in
let msgs2 = List.init 3 ~f:Fn.id in
let s1, p1 = Lwt_stream.create () in
List.iter msgs1 ~f:(fun m1 -> p1 (Some m1)) ;
p1 None ;
let s2 = Lwt_stream.of_list msgs2 in
let s12 = product_stream s1 s2 in
Alcotest.(check @@ list (pair int int))
"equal"
[ (0, 0); (1, 0); (2, 0); (0, 1); (1, 1); (2, 1); (0, 2); (1, 2); (2, 2) ]
(Lwt_stream.get_available s12) ;
()
let two_stepped () =
let msgs1 = List.init 3 ~f:Fn.id in
let msgs2 = List.init 3 ~f:Fn.id in
let s1, p1 = Lwt_stream.create () in
List.iter msgs1 ~f:(fun m -> p1 (Some m)) ;
p1 None ;
let s2, p2 = Lwt_stream.create () in
List.iter msgs2 ~f:(fun m -> p2 (Some m)) ;
p2 None ;
let s12 = product_stream s1 s2 in
Alcotest.(check @@ list (pair int int))
"equal"
[ (0, 0); (1, 0); (2, 0); (0, 1); (1, 1); (2, 1); (0, 2); (1, 2); (2, 2) ]
(Lwt_stream.get_available s12) ;
()
let two_stepped_interleaved () =
let msgs = List.init 3 ~f:Fn.id in
let s1, p1 = Lwt_stream.create () in
let s2, p2 = Lwt_stream.create () in
List.iter msgs ~f:(fun m ->
p1 (Some m) ;
p2 (Some m)) ;
p1 None ;
p2 None ;
let s12 = product_stream s1 s2 in
Alcotest.(check @@ list (pair int int))
"equal"
[ (0, 0); (1, 0); (2, 0); (0, 1); (1, 1); (2, 1); (0, 2); (1, 2); (2, 2) ]
(Lwt_stream.get_available s12) ;
()
(* This test can pass when the `product_stream` uses `iter_p` rather than `iter_s` *)
let two_opened () =
let msgs = List.init 3 ~f:Fn.id in
let s1, p1 = Lwt_stream.create () in
let s2, p2 = Lwt_stream.create () in
List.iter msgs ~f:(fun m ->
p1 (Some m) ;
p2 (Some m)) ;
let s12 = product_stream s1 s2 in
Alcotest.(check @@ list (pair int int))
"equal"
[ (0, 0); (1, 0); (2, 0); (0, 1); (1, 1); (2, 1); (0, 2); (1, 2); (2, 2) ]
(Lwt_stream.get_available s12) ;
()
let two_closed () =
let msgs1 = List.init 3 ~f:Fn.id in
let msgs2 = List.init 3 ~f:Fn.id in
let s1, p1 = Lwt_stream.create () in
List.iter msgs1 ~f:(fun m -> p1 (Some m)) ;
p1 None ;
let s2, p2 = Lwt_stream.create () in
List.iter msgs2 ~f:(fun m -> p2 (Some m)) ;
p2 None ;
let s12 = product_stream s1 s2 in
let ans = Lwt_main.run @@ Lwt_stream.to_list s12 in
Alcotest.(check @@ list (pair int int))
"equal"
[ (0, 0); (1, 0); (2, 0); (0, 1); (1, 1); (2, 1); (0, 2); (1, 2); (2, 2) ]
ans ;
()
let all_tests =
[
Alcotest.test_case "two_closed_streams" `Quick two_streams_from_list;
Alcotest.test_case "one_stepped_one_list" `Quick one_stepped_one_list;
Alcotest.test_case "two_stepped" `Quick two_stepped;
Alcotest.test_case "two_stepped_interleaved" `Quick two_stepped_interleaved;
Alcotest.test_case "two_opened" `Quick two_opened;
Alcotest.test_case "two_closed" `Quick two_closed;
]
end
let () =
Alcotest.run "Tests"
[
("lwt_stream", Test_stream.all_tests);
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment