public
Created

OUnit patch proposal for parallelism

  • Download Gist
oUnitCore.diff
Diff
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
--- old-ounit/src/oUnitCore.ml 2013-03-16 18:45:34.806295575 +0100
+++ new-ounit/src/oUnitCore.ml 2013-03-16 18:45:34.814295575 +0100
@@ -14,6 +14,11 @@
* Types and global states.
*)
+type runner_type =
+ | Sequential
+ | Threaded
+ | Processus
+
let global_verbose =
OUnitConf.make
"verbose"
@@ -51,19 +56,21 @@
let global_chooser = ref OUnitChooser.simple
+let thread_pool_threshold = 10
+(** Under this limit, create exactly one thread by test *)
+let thread_pool_size = 15
+(** Should be adjusted but depends of too many parameters, make you own tests *)
+
(* Events which can happen during testing *)
-(* Run all tests, report starts, errors, failures, and return the results *)
-let perform_test logger test =
+let report path e = OUnitLogger.report !global_logger (TestEvent (path, e))
- let report path e =
- OUnitLogger.report logger (TestEvent (path, e))
- in
-
- let run_test_case f path =
- let result =
- try
+(* Run all tests, sequential version *)
+let run_all_tests_seq test_cases =
+ let run_test_case f path =
+ let result =
+ try
f ();
RSuccess
with e ->
@@ -73,17 +80,169 @@
else
None
in
- match e with
+ match e with
| Failure s -> RFailure (s, backtrace)
| Skip s -> RSkip s
| Todo s -> RTodo s
| s -> RError (Printexc.to_string s, backtrace)
+ in
+ let position =
+ OUnitLogger.position !global_logger
+ in
+ result, position
+ in
+ let runner (path, f) =
+ let result, position =
+ report path EStart;
+ run_test_case f path
in
- let position =
- OUnitLogger.position logger
+ report path (EResult result);
+ report path EEnd;
+ path, result, position
+ in
+ let rec iter state =
+ match state.tests_planned with
+ | [] ->
+ state.results
+ | _ ->
+ let (path, f) = !global_chooser state in
+ let result = runner (path, f) in
+ iter
+ {
+ results = result :: state.results;
+ tests_planned =
+ List.filter
+ (fun (path', _) -> path <> path')
+ state.tests_planned
+ }
+ in
+ iter {results = []; tests_planned = test_cases}
+
+(* Run all test, threaded version *)
+let run_all_tests_threaded test_cases =
+ (* perform_test.run_test_case equivalent *)
+ let thread_run test_fun = try
+ test_fun (); RSuccess
+ with e -> (* No backtraces because I suspect them to not be thread-safe *)
+ match e with
+ | Failure s -> RFailure (s, None)
+ | Skip s -> RSkip s
+ | Todo s -> RTodo s
+ | s -> RError (Printexc.to_string s, None)
+ in
+
+ (* thread-wide synchronization *)
+ let thread_main (wait_chan, result_chan) =
+ while true do
+ let event = Event.receive wait_chan in
+ let (test_path, test_fun) = Event.sync event in
+ report test_path EStart; (* FIXME *)
+ (* seems a very bad idea as report is NOT thread safe, broking lines in *)
+ (* the log file, maybe more one day. Find an other solution of patch*)
+ (* report *)
+ let test_res = thread_run test_fun in
+ Event.sync (Event.send result_chan (test_path, test_res))
+ done
+ in
+
+ (* application-wide synchronization, end of perfom_test.runner equivalent *)
+ let synchronizer_main (test_number, result_chan, suite_result_chan) =
+ let i = ref test_number and l = ref [] in
+ while !i > 0 do
+ let (path, res) = Event.sync (Event.receive result_chan) in
+ report path (EResult res);
+ report path EEnd;
+ l := (path, res, None)::!l;
+ decr i
+ done;
+ Event.sync (Event.send suite_result_chan !l)
+ in
+
+ (* beginning of preform_test.runn equivalent, wait results from synchronizer *)
+ let rec schedule wait_chan suite_result_chan = function
+ | [] -> Event.sync (Event.receive suite_result_chan)
+ | test::tests_planned ->
+ Event.sync (Event.send wait_chan test);
+ schedule wait_chan suite_result_chan tests_planned
+ in
+ (* init channels to pass values with easy synchronization *)
+ let len = List.length test_cases
+ and wait_chan = Event.new_channel () (* threads will get tests by there *)
+ and result_chan = Event.new_channel () (* and send test result here *)
+ and suite_result_chan = Event.new_channel () (* theses result will be
+ * aggregated here
+ *)
+ in
+ (* then init our threads: a pool, a scheduler that dispatches tests,*)
+ (* and a synchronizer that aggregates result and call the logger *)
+ let pool_size = if len < thread_pool_threshold
+ then len else thread_pool_size
+ in
+ Thread.create synchronizer_main (len, result_chan, suite_result_chan);
+ for i = 0 to pool_size do
+ Thread.create thread_main (wait_chan, result_chan)
+ done;
+ schedule wait_chan suite_result_chan test_cases
+
+
+
+
+let base_port = 32757 (* last 5 md5 of "ocaml" *)
+
+(* Run all tests, processus version *)
+let run_all_tests_process test_cases =
+
+ let mk_sock () =
+ (* we must choose PF_INET as it's the only portable choice *)
+ let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
+ Unix.bind
+ sock
+ (Unix.ADDR_INET (Unix.inet_addr_loopback, base_port));
+ sock
+
+ and mk_process nb =
+ let l = ref [] in
+ for i = 0 to nb do
+ l := (Unix.create_process
+ "oUnitWorker"
+ [| (string_of_int base_port) |]
+ Unix.stdin
+ Unix.stdout
+ Unix.stderr) :: !l
+ done;
+ !l
+
+ (* send a test in one socket *)
+ and send_test (path, test) sock =
+ report path EStart;
+ let data = Marshal.to_string (Some (path, test)) [Marshal.Closures] in
+ if 0 = Unix.write sock data 0 (String.length data)
+ then () (* error handling *)
+ else ()
+
+ (* read a result from one socket then report it *)
+ and recv_test sock =
+ let buff = Buffer.create 4096
+ and str = ref (String.make 4096 (Char.chr 0))
in
- result, position
+ while 0 < Unix.read sock !str 0 (String.length !str) do
+ Buffer.add_string buff !str;
+ str := String.make 4096 (Char.chr 0)
+ done;
+ let (path, res) = Marshal.from_string (Buffer.contents buff) 0 in
+ report path EEnd;
+ (path, res)
+
+ in
+
+ let sock = mk_sock ()
+ and process = mk_process 5
in
+ (* TODO accept on socket, then select on the resulting sockets *)
+ []
+
+(* Run all tests, report starts, errors, failures, and return the results *)
+let perform_test logger test =
let rec flatten_test path acc =
function
| TestCase(f) ->
@@ -96,39 +255,18 @@
((ListItem cnt)::path)
acc t)
acc tests
-
| TestLabel (label, t) ->
flatten_test ((Label label)::path) acc t
in
let test_cases =
List.rev (flatten_test [] [] test)
in
- let runner (path, f) =
- let result, position =
- report path EStart;
- run_test_case f path
- in
- report path (EResult result);
- report path EEnd;
- path, result, position
- in
- let rec iter state =
- match state.tests_planned with
- | [] ->
- state.results
- | _ ->
- let (path, f) = !global_chooser state in
- let result = runner (path, f) in
- iter
- {
- results = result :: state.results;
- tests_planned =
- List.filter
- (fun (path', _) -> path <> path')
- state.tests_planned
- }
- in
- iter {results = []; tests_planned = test_cases}
+ let run_type = Processus in
+ match run_type with (* TODO use a parameter in the function to switch *)
+ | Sequential -> run_all_tests_seq test_cases
+ | Threaded -> run_all_tests_threaded test_cases
+ | Processus -> run_all_tests_process test_cases
+
(* A simple (currently too simple) text based test runner *)
let run_test_tt ?verbose test =
@@ -164,7 +302,7 @@
(* Now start the test *)
let running_time, test_results =
time_fun
- perform_test
+ perform_test
logger
test
in
oUnitCore.ml
OCaml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
(***********************************************************************)
(* The OUnit library *)
(* *)
(* Copyright (C) 2002-2008 Maas-Maarten Zeeman. *)
(* Copyright (C) 2010 OCamlCore SARL *)
(* *)
(* See LICENSE for details. *)
(***********************************************************************)
 
open OUnitUtils
include OUnitTypes
 
(*
* Types and global states.
*)
 
type runner_type =
| Sequential
| Threaded
| Processus
 
let global_verbose =
OUnitConf.make
"verbose"
(fun r -> Arg.Set r)
~printer:string_of_bool
false
"Run test in verbose mode."
 
let global_output_file =
let pwd = Sys.getcwd () in
let ocamlbuild_dir = Filename.concat pwd "_build" in
let dir =
if Sys.file_exists ocamlbuild_dir && Sys.is_directory ocamlbuild_dir then
ocamlbuild_dir
else
pwd
in
let fn = Filename.concat dir "oUnit.log" in
OUnitConf.make
"output_file"
~arg_string:"fn"
~alternates:["no_output_file",
(fun r -> Arg.Unit (fun () -> r:= None)),
None,
"Prevent to write log in a file."]
~printer:(function
| None -> "<none>"
| Some fn -> Printf.sprintf "%S" fn)
(fun r -> Arg.String (fun s -> r := Some s))
(Some fn)
"Output verbose log in the given file."
 
(* TODO: remove *)
let global_logger = ref OUnitLogger.null_logger
 
let global_chooser = ref OUnitChooser.simple
 
let thread_pool_threshold = 10
(** Under this limit, create exactly one thread by test *)
let thread_pool_size = 15
(** Should be adjusted but depends of too many parameters, make you own tests *)
 
(* Events which can happen during testing *)
 
 
let report path e = OUnitLogger.report !global_logger (TestEvent (path, e))
 
(* Run all tests, sequential version *)
let run_all_tests_seq test_cases =
let run_test_case f path =
let result =
try
f ();
RSuccess
with e ->
let backtrace =
if Printexc.backtrace_status () then
Some (Printexc.get_backtrace ())
else
None
in
match e with
| Failure s -> RFailure (s, backtrace)
| Skip s -> RSkip s
| Todo s -> RTodo s
| s -> RError (Printexc.to_string s, backtrace)
in
let position =
OUnitLogger.position !global_logger
in
result, position
in
let runner (path, f) =
let result, position =
report path EStart;
run_test_case f path
in
report path (EResult result);
report path EEnd;
path, result, position
in
let rec iter state =
match state.tests_planned with
| [] ->
state.results
| _ ->
let (path, f) = !global_chooser state in
let result = runner (path, f) in
iter
{
results = result :: state.results;
tests_planned =
List.filter
(fun (path', _) -> path <> path')
state.tests_planned
}
in
iter {results = []; tests_planned = test_cases}
 
(* Run all test, threaded version *)
let run_all_tests_threaded test_cases =
(* perform_test.run_test_case equivalent *)
let thread_run test_fun = try
test_fun (); RSuccess
with e -> (* No backtraces because I suspect them to not be thread-safe *)
match e with
| Failure s -> RFailure (s, None)
| Skip s -> RSkip s
| Todo s -> RTodo s
| s -> RError (Printexc.to_string s, None)
in
 
(* thread-wide synchronization *)
let thread_main (wait_chan, result_chan) =
while true do
let event = Event.receive wait_chan in
let (test_path, test_fun) = Event.sync event in
report test_path EStart; (* FIXME *)
(* seems a very bad idea as report is NOT thread safe, broking lines in *)
(* the log file, maybe more one day. Find an other solution of patch*)
(* report *)
let test_res = thread_run test_fun in
Event.sync (Event.send result_chan (test_path, test_res))
done
in
 
(* application-wide synchronization, end of perfom_test.runner equivalent *)
let synchronizer_main (test_number, result_chan, suite_result_chan) =
let i = ref test_number and l = ref [] in
while !i > 0 do
let (path, res) = Event.sync (Event.receive result_chan) in
report path (EResult res);
report path EEnd;
l := (path, res, None)::!l;
decr i
done;
Event.sync (Event.send suite_result_chan !l)
in
 
(* beginning of preform_test.runn equivalent, wait results from synchronizer *)
let rec schedule wait_chan suite_result_chan = function
| [] -> Event.sync (Event.receive suite_result_chan)
| test::tests_planned ->
Event.sync (Event.send wait_chan test);
schedule wait_chan suite_result_chan tests_planned
in
(* init channels to pass values with easy synchronization *)
let len = List.length test_cases
and wait_chan = Event.new_channel () (* threads will get tests by there *)
and result_chan = Event.new_channel () (* and send test result here *)
and suite_result_chan = Event.new_channel () (* theses result will be
* aggregated here
*)
in
(* then init our threads: a pool, a scheduler that dispatches tests,*)
(* and a synchronizer that aggregates result and call the logger *)
let pool_size = if len < thread_pool_threshold
then len else thread_pool_size
in
Thread.create synchronizer_main (len, result_chan, suite_result_chan);
for i = 0 to pool_size do
Thread.create thread_main (wait_chan, result_chan)
done;
schedule wait_chan suite_result_chan test_cases
 
 
 
 
let base_port = 32757 (* last 5 md5 of "ocaml" *)
 
(* Run all tests, processus version *)
let run_all_tests_process test_cases =
 
let mk_sock () =
(* we must choose PF_INET as it's the only portable choice *)
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Unix.bind
sock
(Unix.ADDR_INET (Unix.inet_addr_loopback, base_port));
sock
 
and mk_process nb =
let l = ref [] in
for i = 0 to nb do
l := (Unix.create_process
"oUnitWorker"
[| (string_of_int base_port) |]
Unix.stdin
Unix.stdout
Unix.stderr) :: !l
done;
!l
 
(* send a test in one socket *)
and send_test (path, test) sock =
report path EStart;
let data = Marshal.to_string (Some (path, test)) [Marshal.Closures] in
if 0 = Unix.write sock data 0 (String.length data)
then () (* error handling *)
else ()
 
(* read a result from one socket then report it *)
and recv_test sock =
let buff = Buffer.create 4096
and str = ref (String.make 4096 (Char.chr 0))
in
while 0 < Unix.read sock !str 0 (String.length !str) do
Buffer.add_string buff !str;
str := String.make 4096 (Char.chr 0)
done;
let (path, res) = Marshal.from_string (Buffer.contents buff) 0 in
report path EEnd;
(path, res)
 
in
 
let sock = mk_sock ()
and process = mk_process 5
in
(* TODO accept on socket, then select on the resulting sockets *)
[]
 
(* Run all tests, report starts, errors, failures, and return the results *)
let perform_test logger test =
let rec flatten_test path acc =
function
| TestCase(f) ->
(path, f) :: acc
 
| TestList (tests) ->
fold_lefti
(fun acc t cnt ->
flatten_test
((ListItem cnt)::path)
acc t)
acc tests
| TestLabel (label, t) ->
flatten_test ((Label label)::path) acc t
in
let test_cases =
List.rev (flatten_test [] [] test)
in
let run_type = Processus in
match run_type with (* TODO use a parameter in the function to switch *)
| Sequential -> run_all_tests_seq test_cases
| Threaded -> run_all_tests_threaded test_cases
| Processus -> run_all_tests_process test_cases
 
 
(* A simple (currently too simple) text based test runner *)
let run_test_tt ?verbose test =
let () =
Printexc.record_backtrace true
in
 
let base_logger =
OUnitLogger.create
(global_output_file ())
(global_verbose ())
OUnitLogger.null_logger
in
let html_logger =
OUnitLoggerHTML.create ()
in
let junit_logger =
OUnitLoggerJUnit.create ()
in
let logger =
OUnitLogger.combine
[base_logger; html_logger; junit_logger]
in
let () =
(* TODO: is it really useful to override this ? *)
global_logger := logger
in
 
let () =
OUnitConf.dump (OUnitLogger.report logger)
in
 
(* Now start the test *)
let running_time, test_results =
time_fun
perform_test
logger
test
in
(* Print test report *)
OUnitLogger.report logger
(GlobalEvent
(GResults (running_time, test_results, test_case_count test)));
 
(* Reset logger. *)
OUnitLogger.close logger;
global_logger := OUnitLogger.null_logger;
 
(* Return the results possibly for further processing *)
test_results
(* Call this one from you test suites *)
let run_test_tt_main ?(arg_specs=[]) ?(set_verbose=ignore) ?fexit suite =
let fexit =
match fexit with
| Some f -> f
| None ->
(fun test_results ->
if not (was_successful test_results) then
exit 1)
in
let only_test =
OUnitConf.make
"only_test"
~arg_string:"path"
~printer:(fun lst -> String.concat "," (List.map (Printf.sprintf "%S") lst))
(fun r -> Arg.String (fun str -> r := str :: !r))
[]
"Run only the selected tests."
in
let list_test =
OUnitConf.make
"list_test"
(fun r -> Arg.Set r)
~printer:string_of_bool
false
"List tests"
in
let () =
OUnitConf.load arg_specs
in
if list_test () then
begin
List.iter
(fun pth -> print_endline (string_of_path pth))
(OUnitTest.test_case_paths suite)
end
else
begin
let nsuite =
if only_test () = [] then
suite
else
begin
match OUnitTest.test_filter ~skip:true (only_test ()) suite with
| Some test ->
test
| None ->
failwith
(Printf.sprintf
"Filtering test %s lead to no tests."
(String.concat ", " (only_test ())))
end
in
 
let test_results =
set_verbose (global_verbose ());
run_test_tt ~verbose:(global_verbose ()) nsuite
in
fexit test_results
end
oUnitWorker.ml
OCaml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
(** OUnitWorker
*
* A OUnitWorker is a single process that wait for a socket, read test, execute
* it, and return the result on the socket
*
* This need to be done in another process because ocaml Threads are not truly
* concurrent. Moreover we cannot use Unix.fork because it's not portable
*)
 
open OUnitTypes
 
let (|>) f x = x f
let (|-) f g x = g (f x)
let tap f x = f x; x
 
let runner socket =
let rec wait_socket sock =
let (socks, _ , _) = (Unix.select [socket] [] [] (-1.)) in
match socks with
| [] -> wait_socket sock
| s_test_cases::l -> s_test_cases
 
and read_socket sock =
let buff = Buffer.create 4096
and str = ref (String.make 4096 (Char.chr 0))
in
while 0 < Unix.read sock !str 0 4096 do
Buffer.add_string buff !str;
str := String.make 4096 (Char.chr 0)
done;
Buffer.contents buff
 
and run_test = function
| None -> ("", RSuccess)
| Some (path, test) ->
try
test (); (path, RSuccess)
with e ->
let backtrace =
if Printexc.backtrace_status ()
then Some (Printexc.get_backtrace ())
else None
in
match e with
| Failure s -> (path, RFailure (s, backtrace))
| Skip s -> (path, RSkip s)
| Todo s -> (path, RTodo s)
| s -> (path, RError (Printexc.to_string s, backtrace))
 
and write_socket sock str =
if 0 = Unix.single_write sock str 0 (String.length str)
then () (* TODO error handling : exit ? log something ? *)
else ()
 
and quit_if_end = function
| None -> exit 0
| Some _ -> ()
 
in
while true do
wait_socket socket
|> read_socket
|> (fun s -> Marshal.from_string s 0)
|> tap quit_if_end (* if None is send, exit here *)
|> run_test
|> (fun res -> Marshal.to_string res [Marshal.Closures])
|> (write_socket socket)
done
 
let _ =
if Array.length Sys.argv < 2
then (print_endline "oUnitWorker is only called by OUnit !"; exit 64)
else
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
int_of_string Sys.argv.(1)
|> fun s -> (Unix.inet_addr_loopback, s)
|> fun (a,b) -> Unix.ADDR_INET (a,b)
|> (Unix.connect sock);
runner sock

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.