Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
OUnit patch proposal for parallelism
--- old-ounit/src/oUnitCore.ml 2013-03-16 18:45:34.806295575 +0100
+++ new-ounit/src/oUnitCore.ml 2013-03-16 18:45:34.814295575 +0100
@@ -14,6 +14,11 @@
* Types and global states.
*)
+type runner_type =
+ | Sequential
+ | Threaded
+ | Processus
+
let global_verbose =
OUnitConf.make
"verbose"
@@ -51,19 +56,21 @@
let global_chooser = ref OUnitChooser.simple
+let thread_pool_threshold = 10
+(** Under this limit, create exactly one thread by test *)
+let thread_pool_size = 15
+(** Should be adjusted but depends of too many parameters, make you own tests *)
+
(* Events which can happen during testing *)
-(* Run all tests, report starts, errors, failures, and return the results *)
-let perform_test logger test =
+let report path e = OUnitLogger.report !global_logger (TestEvent (path, e))
- let report path e =
- OUnitLogger.report logger (TestEvent (path, e))
- in
-
- let run_test_case f path =
- let result =
- try
+(* Run all tests, sequential version *)
+let run_all_tests_seq test_cases =
+ let run_test_case f path =
+ let result =
+ try
f ();
RSuccess
with e ->
@@ -73,17 +80,169 @@
else
None
in
- match e with
+ match e with
| Failure s -> RFailure (s, backtrace)
| Skip s -> RSkip s
| Todo s -> RTodo s
| s -> RError (Printexc.to_string s, backtrace)
+ in
+ let position =
+ OUnitLogger.position !global_logger
+ in
+ result, position
+ in
+ let runner (path, f) =
+ let result, position =
+ report path EStart;
+ run_test_case f path
in
- let position =
- OUnitLogger.position logger
+ report path (EResult result);
+ report path EEnd;
+ path, result, position
+ in
+ let rec iter state =
+ match state.tests_planned with
+ | [] ->
+ state.results
+ | _ ->
+ let (path, f) = !global_chooser state in
+ let result = runner (path, f) in
+ iter
+ {
+ results = result :: state.results;
+ tests_planned =
+ List.filter
+ (fun (path', _) -> path <> path')
+ state.tests_planned
+ }
+ in
+ iter {results = []; tests_planned = test_cases}
+
+(* Run all test, threaded version *)
+let run_all_tests_threaded test_cases =
+ (* perform_test.run_test_case equivalent *)
+ let thread_run test_fun = try
+ test_fun (); RSuccess
+ with e -> (* No backtraces because I suspect them to not be thread-safe *)
+ match e with
+ | Failure s -> RFailure (s, None)
+ | Skip s -> RSkip s
+ | Todo s -> RTodo s
+ | s -> RError (Printexc.to_string s, None)
+ in
+
+ (* thread-wide synchronization *)
+ let thread_main (wait_chan, result_chan) =
+ while true do
+ let event = Event.receive wait_chan in
+ let (test_path, test_fun) = Event.sync event in
+ report test_path EStart; (* FIXME *)
+ (* seems a very bad idea as report is NOT thread safe, broking lines in *)
+ (* the log file, maybe more one day. Find an other solution of patch*)
+ (* report *)
+ let test_res = thread_run test_fun in
+ Event.sync (Event.send result_chan (test_path, test_res))
+ done
+ in
+
+ (* application-wide synchronization, end of perfom_test.runner equivalent *)
+ let synchronizer_main (test_number, result_chan, suite_result_chan) =
+ let i = ref test_number and l = ref [] in
+ while !i > 0 do
+ let (path, res) = Event.sync (Event.receive result_chan) in
+ report path (EResult res);
+ report path EEnd;
+ l := (path, res, None)::!l;
+ decr i
+ done;
+ Event.sync (Event.send suite_result_chan !l)
+ in
+
+ (* beginning of preform_test.runn equivalent, wait results from synchronizer *)
+ let rec schedule wait_chan suite_result_chan = function
+ | [] -> Event.sync (Event.receive suite_result_chan)
+ | test::tests_planned ->
+ Event.sync (Event.send wait_chan test);
+ schedule wait_chan suite_result_chan tests_planned
+ in
+ (* init channels to pass values with easy synchronization *)
+ let len = List.length test_cases
+ and wait_chan = Event.new_channel () (* threads will get tests by there *)
+ and result_chan = Event.new_channel () (* and send test result here *)
+ and suite_result_chan = Event.new_channel () (* theses result will be
+ * aggregated here
+ *)
+ in
+ (* then init our threads: a pool, a scheduler that dispatches tests,*)
+ (* and a synchronizer that aggregates result and call the logger *)
+ let pool_size = if len < thread_pool_threshold
+ then len else thread_pool_size
+ in
+ Thread.create synchronizer_main (len, result_chan, suite_result_chan);
+ for i = 0 to pool_size do
+ Thread.create thread_main (wait_chan, result_chan)
+ done;
+ schedule wait_chan suite_result_chan test_cases
+
+
+
+
+let base_port = 32757 (* last 5 md5 of "ocaml" *)
+
+(* Run all tests, processus version *)
+let run_all_tests_process test_cases =
+
+ let mk_sock () =
+ (* we must choose PF_INET as it's the only portable choice *)
+ let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
+ Unix.bind
+ sock
+ (Unix.ADDR_INET (Unix.inet_addr_loopback, base_port));
+ sock
+
+ and mk_process nb =
+ let l = ref [] in
+ for i = 0 to nb do
+ l := (Unix.create_process
+ "oUnitWorker"
+ [| (string_of_int base_port) |]
+ Unix.stdin
+ Unix.stdout
+ Unix.stderr) :: !l
+ done;
+ !l
+
+ (* send a test in one socket *)
+ and send_test (path, test) sock =
+ report path EStart;
+ let data = Marshal.to_string (Some (path, test)) [Marshal.Closures] in
+ if 0 = Unix.write sock data 0 (String.length data)
+ then () (* error handling *)
+ else ()
+
+ (* read a result from one socket then report it *)
+ and recv_test sock =
+ let buff = Buffer.create 4096
+ and str = ref (String.make 4096 (Char.chr 0))
in
- result, position
+ while 0 < Unix.read sock !str 0 (String.length !str) do
+ Buffer.add_string buff !str;
+ str := String.make 4096 (Char.chr 0)
+ done;
+ let (path, res) = Marshal.from_string (Buffer.contents buff) 0 in
+ report path EEnd;
+ (path, res)
+
+ in
+
+ let sock = mk_sock ()
+ and process = mk_process 5
in
+ (* TODO accept on socket, then select on the resulting sockets *)
+ []
+
+(* Run all tests, report starts, errors, failures, and return the results *)
+let perform_test logger test =
let rec flatten_test path acc =
function
| TestCase(f) ->
@@ -96,39 +255,18 @@
((ListItem cnt)::path)
acc t)
acc tests
-
| TestLabel (label, t) ->
flatten_test ((Label label)::path) acc t
in
let test_cases =
List.rev (flatten_test [] [] test)
in
- let runner (path, f) =
- let result, position =
- report path EStart;
- run_test_case f path
- in
- report path (EResult result);
- report path EEnd;
- path, result, position
- in
- let rec iter state =
- match state.tests_planned with
- | [] ->
- state.results
- | _ ->
- let (path, f) = !global_chooser state in
- let result = runner (path, f) in
- iter
- {
- results = result :: state.results;
- tests_planned =
- List.filter
- (fun (path', _) -> path <> path')
- state.tests_planned
- }
- in
- iter {results = []; tests_planned = test_cases}
+ let run_type = Processus in
+ match run_type with (* TODO use a parameter in the function to switch *)
+ | Sequential -> run_all_tests_seq test_cases
+ | Threaded -> run_all_tests_threaded test_cases
+ | Processus -> run_all_tests_process test_cases
+
(* A simple (currently too simple) text based test runner *)
let run_test_tt ?verbose test =
@@ -164,7 +302,7 @@
(* Now start the test *)
let running_time, test_results =
time_fun
- perform_test
+ perform_test
logger
test
in
(***********************************************************************)
(* The OUnit library *)
(* *)
(* Copyright (C) 2002-2008 Maas-Maarten Zeeman. *)
(* Copyright (C) 2010 OCamlCore SARL *)
(* *)
(* See LICENSE for details. *)
(***********************************************************************)
open OUnitUtils
include OUnitTypes
(*
* Types and global states.
*)
type runner_type =
| Sequential
| Threaded
| Processus
let global_verbose =
OUnitConf.make
"verbose"
(fun r -> Arg.Set r)
~printer:string_of_bool
false
"Run test in verbose mode."
let global_output_file =
let pwd = Sys.getcwd () in
let ocamlbuild_dir = Filename.concat pwd "_build" in
let dir =
if Sys.file_exists ocamlbuild_dir && Sys.is_directory ocamlbuild_dir then
ocamlbuild_dir
else
pwd
in
let fn = Filename.concat dir "oUnit.log" in
OUnitConf.make
"output_file"
~arg_string:"fn"
~alternates:["no_output_file",
(fun r -> Arg.Unit (fun () -> r:= None)),
None,
"Prevent to write log in a file."]
~printer:(function
| None -> "<none>"
| Some fn -> Printf.sprintf "%S" fn)
(fun r -> Arg.String (fun s -> r := Some s))
(Some fn)
"Output verbose log in the given file."
(* TODO: remove *)
let global_logger = ref OUnitLogger.null_logger
let global_chooser = ref OUnitChooser.simple
let thread_pool_threshold = 10
(** Under this limit, create exactly one thread by test *)
let thread_pool_size = 15
(** Should be adjusted but depends of too many parameters, make you own tests *)
(* Events which can happen during testing *)
let report path e = OUnitLogger.report !global_logger (TestEvent (path, e))
(* Run all tests, sequential version *)
let run_all_tests_seq test_cases =
let run_test_case f path =
let result =
try
f ();
RSuccess
with e ->
let backtrace =
if Printexc.backtrace_status () then
Some (Printexc.get_backtrace ())
else
None
in
match e with
| Failure s -> RFailure (s, backtrace)
| Skip s -> RSkip s
| Todo s -> RTodo s
| s -> RError (Printexc.to_string s, backtrace)
in
let position =
OUnitLogger.position !global_logger
in
result, position
in
let runner (path, f) =
let result, position =
report path EStart;
run_test_case f path
in
report path (EResult result);
report path EEnd;
path, result, position
in
let rec iter state =
match state.tests_planned with
| [] ->
state.results
| _ ->
let (path, f) = !global_chooser state in
let result = runner (path, f) in
iter
{
results = result :: state.results;
tests_planned =
List.filter
(fun (path', _) -> path <> path')
state.tests_planned
}
in
iter {results = []; tests_planned = test_cases}
(* Run all test, threaded version *)
let run_all_tests_threaded test_cases =
(* perform_test.run_test_case equivalent *)
let thread_run test_fun = try
test_fun (); RSuccess
with e -> (* No backtraces because I suspect them to not be thread-safe *)
match e with
| Failure s -> RFailure (s, None)
| Skip s -> RSkip s
| Todo s -> RTodo s
| s -> RError (Printexc.to_string s, None)
in
(* thread-wide synchronization *)
let thread_main (wait_chan, result_chan) =
while true do
let event = Event.receive wait_chan in
let (test_path, test_fun) = Event.sync event in
report test_path EStart; (* FIXME *)
(* seems a very bad idea as report is NOT thread safe, broking lines in *)
(* the log file, maybe more one day. Find an other solution of patch*)
(* report *)
let test_res = thread_run test_fun in
Event.sync (Event.send result_chan (test_path, test_res))
done
in
(* application-wide synchronization, end of perfom_test.runner equivalent *)
let synchronizer_main (test_number, result_chan, suite_result_chan) =
let i = ref test_number and l = ref [] in
while !i > 0 do
let (path, res) = Event.sync (Event.receive result_chan) in
report path (EResult res);
report path EEnd;
l := (path, res, None)::!l;
decr i
done;
Event.sync (Event.send suite_result_chan !l)
in
(* beginning of preform_test.runn equivalent, wait results from synchronizer *)
let rec schedule wait_chan suite_result_chan = function
| [] -> Event.sync (Event.receive suite_result_chan)
| test::tests_planned ->
Event.sync (Event.send wait_chan test);
schedule wait_chan suite_result_chan tests_planned
in
(* init channels to pass values with easy synchronization *)
let len = List.length test_cases
and wait_chan = Event.new_channel () (* threads will get tests by there *)
and result_chan = Event.new_channel () (* and send test result here *)
and suite_result_chan = Event.new_channel () (* theses result will be
* aggregated here
*)
in
(* then init our threads: a pool, a scheduler that dispatches tests,*)
(* and a synchronizer that aggregates result and call the logger *)
let pool_size = if len < thread_pool_threshold
then len else thread_pool_size
in
Thread.create synchronizer_main (len, result_chan, suite_result_chan);
for i = 0 to pool_size do
Thread.create thread_main (wait_chan, result_chan)
done;
schedule wait_chan suite_result_chan test_cases
let base_port = 32757 (* last 5 md5 of "ocaml" *)
(* Run all tests, processus version *)
let run_all_tests_process test_cases =
let mk_sock () =
(* we must choose PF_INET as it's the only portable choice *)
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Unix.bind
sock
(Unix.ADDR_INET (Unix.inet_addr_loopback, base_port));
sock
and mk_process nb =
let l = ref [] in
for i = 0 to nb do
l := (Unix.create_process
"oUnitWorker"
[| (string_of_int base_port) |]
Unix.stdin
Unix.stdout
Unix.stderr) :: !l
done;
!l
(* send a test in one socket *)
and send_test (path, test) sock =
report path EStart;
let data = Marshal.to_string (Some (path, test)) [Marshal.Closures] in
if 0 = Unix.write sock data 0 (String.length data)
then () (* error handling *)
else ()
(* read a result from one socket then report it *)
and recv_test sock =
let buff = Buffer.create 4096
and str = ref (String.make 4096 (Char.chr 0))
in
while 0 < Unix.read sock !str 0 (String.length !str) do
Buffer.add_string buff !str;
str := String.make 4096 (Char.chr 0)
done;
let (path, res) = Marshal.from_string (Buffer.contents buff) 0 in
report path EEnd;
(path, res)
in
let sock = mk_sock ()
and process = mk_process 5
in
(* TODO accept on socket, then select on the resulting sockets *)
[]
(* Run all tests, report starts, errors, failures, and return the results *)
let perform_test logger test =
let rec flatten_test path acc =
function
| TestCase(f) ->
(path, f) :: acc
| TestList (tests) ->
fold_lefti
(fun acc t cnt ->
flatten_test
((ListItem cnt)::path)
acc t)
acc tests
| TestLabel (label, t) ->
flatten_test ((Label label)::path) acc t
in
let test_cases =
List.rev (flatten_test [] [] test)
in
let run_type = Processus in
match run_type with (* TODO use a parameter in the function to switch *)
| Sequential -> run_all_tests_seq test_cases
| Threaded -> run_all_tests_threaded test_cases
| Processus -> run_all_tests_process test_cases
(* A simple (currently too simple) text based test runner *)
let run_test_tt ?verbose test =
let () =
Printexc.record_backtrace true
in
let base_logger =
OUnitLogger.create
(global_output_file ())
(global_verbose ())
OUnitLogger.null_logger
in
let html_logger =
OUnitLoggerHTML.create ()
in
let junit_logger =
OUnitLoggerJUnit.create ()
in
let logger =
OUnitLogger.combine
[base_logger; html_logger; junit_logger]
in
let () =
(* TODO: is it really useful to override this ? *)
global_logger := logger
in
let () =
OUnitConf.dump (OUnitLogger.report logger)
in
(* Now start the test *)
let running_time, test_results =
time_fun
perform_test
logger
test
in
(* Print test report *)
OUnitLogger.report logger
(GlobalEvent
(GResults (running_time, test_results, test_case_count test)));
(* Reset logger. *)
OUnitLogger.close logger;
global_logger := OUnitLogger.null_logger;
(* Return the results possibly for further processing *)
test_results
(* Call this one from you test suites *)
let run_test_tt_main ?(arg_specs=[]) ?(set_verbose=ignore) ?fexit suite =
let fexit =
match fexit with
| Some f -> f
| None ->
(fun test_results ->
if not (was_successful test_results) then
exit 1)
in
let only_test =
OUnitConf.make
"only_test"
~arg_string:"path"
~printer:(fun lst -> String.concat "," (List.map (Printf.sprintf "%S") lst))
(fun r -> Arg.String (fun str -> r := str :: !r))
[]
"Run only the selected tests."
in
let list_test =
OUnitConf.make
"list_test"
(fun r -> Arg.Set r)
~printer:string_of_bool
false
"List tests"
in
let () =
OUnitConf.load arg_specs
in
if list_test () then
begin
List.iter
(fun pth -> print_endline (string_of_path pth))
(OUnitTest.test_case_paths suite)
end
else
begin
let nsuite =
if only_test () = [] then
suite
else
begin
match OUnitTest.test_filter ~skip:true (only_test ()) suite with
| Some test ->
test
| None ->
failwith
(Printf.sprintf
"Filtering test %s lead to no tests."
(String.concat ", " (only_test ())))
end
in
let test_results =
set_verbose (global_verbose ());
run_test_tt ~verbose:(global_verbose ()) nsuite
in
fexit test_results
end
(** OUnitWorker
*
* A OUnitWorker is a single process that wait for a socket, read test, execute
* it, and return the result on the socket
*
* This need to be done in another process because ocaml Threads are not truly
* concurrent. Moreover we cannot use Unix.fork because it's not portable
*)
open OUnitTypes
let (|>) f x = x f
let (|-) f g x = g (f x)
let tap f x = f x; x
let runner socket =
let rec wait_socket sock =
let (socks, _ , _) = (Unix.select [socket] [] [] (-1.)) in
match socks with
| [] -> wait_socket sock
| s_test_cases::l -> s_test_cases
and read_socket sock =
let buff = Buffer.create 4096
and str = ref (String.make 4096 (Char.chr 0))
in
while 0 < Unix.read sock !str 0 4096 do
Buffer.add_string buff !str;
str := String.make 4096 (Char.chr 0)
done;
Buffer.contents buff
and run_test = function
| None -> ("", RSuccess)
| Some (path, test) ->
try
test (); (path, RSuccess)
with e ->
let backtrace =
if Printexc.backtrace_status ()
then Some (Printexc.get_backtrace ())
else None
in
match e with
| Failure s -> (path, RFailure (s, backtrace))
| Skip s -> (path, RSkip s)
| Todo s -> (path, RTodo s)
| s -> (path, RError (Printexc.to_string s, backtrace))
and write_socket sock str =
if 0 = Unix.single_write sock str 0 (String.length str)
then () (* TODO error handling : exit ? log something ? *)
else ()
and quit_if_end = function
| None -> exit 0
| Some _ -> ()
in
while true do
wait_socket socket
|> read_socket
|> (fun s -> Marshal.from_string s 0)
|> tap quit_if_end (* if None is send, exit here *)
|> run_test
|> (fun res -> Marshal.to_string res [Marshal.Closures])
|> (write_socket socket)
done
let _ =
if Array.length Sys.argv < 2
then (print_endline "oUnitWorker is only called by OUnit !"; exit 64)
else
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
int_of_string Sys.argv.(1)
|> fun s -> (Unix.inet_addr_loopback, s)
|> fun (a,b) -> Unix.ADDR_INET (a,b)
|> (Unix.connect sock);
runner sock
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.