Skip to content

Instantly share code, notes, and snippets.

@t0yv0
Created October 4, 2012 17:04
Show Gist options
  • Save t0yv0/3834976 to your computer and use it in GitHub Desktop.
Save t0yv0/3834976 to your computer and use it in GitHub Desktop.
open System
open System.Collections.Generic
open System.Diagnostics
open System.Threading
open System.Threading.Tasks
[<Sealed>]
type Channel<'T>() =
let root = obj ()
let qF = Queue<'T->unit>()
let qX = Queue<'T>()
let mutable n = 0
let enq x =
lock root (fun () ->
let k = n
n <- n + 1
if k < 0 then
let f = qF.Dequeue()
fun () -> f x
else
qX.Enqueue(x)
ignore) ()
let deq f =
lock root (fun () ->
let k = n
n <- n - 1
if k > 0 then
let x = qX.Dequeue()
fun () -> f x
else
qF.Enqueue(f)
ignore) ()
let receiveAsync =
Async.FromContinuations(fun (ok, _, _) -> deq ok)
member this.Receive() = receiveAsync
member this.Receive(f) = deq f
member this.Send(x) = enq x
[<Sealed>]
type Agent<'T,'R>(f: Channel<'T> -> Async<'R>, opts) =
let chan = Channel<'T>()
let task = Async.StartAsTask(f chan, opts)
member this.Send(m) = chan.Send(m)
member this.Task = task
[<Sealed>]
type Agent =
static member Start(f) = Agent(f, TaskCreationOptions.None)
static member Start(f, opts) = Agent(f, opts)
let spawn f =
Async.Start (async { return f () })
let spawnAsync computation k =
async {
let! r = computation
return k r
}
|> Async.Start
let orderedParallel (computations: Async<'T> []) (k: 'T -> unit) =
let count = computations.Length
let agent =
Agent.Start<int * 'T, unit>(fun chan ->
let rec loop backlog c =
async {
if c = count then
return ()
else
match Map.tryFind c backlog with
| Some x ->
do k x
return! loop (Map.remove c backlog) (c + 1)
| None ->
let! (n, x) = chan.Receive()
if n = c then
do k x
return! loop backlog (c + 1)
else
return! loop (Map.add n x backlog) c
}
loop Map.empty 0)
spawn <| fun () ->
computations
|> Array.iteri (fun i computation ->
spawnAsync computation (fun x -> agent.Send(i, x)))
agent.Task.Wait()
type TestResults =
{
Name : string
Time : TimeSpan
Failure : exn option
}
type Test =
{
Name : string
Run : Async<TestResults>
}
let printResults (results: TestResults) =
match results.Failure with
| None ->
printfn "Test %s: OK in %g sec" results.Name results.Time.TotalSeconds
| Some e ->
printfn "Test %s: FAIL in %g sec" results.Name results.Time.TotalSeconds
printfn "%O" e
/// Start the given tests and print a sumary at the end
let startTests tests =
let failed = ref 0
let passed = ref 0
let task =
orderedParallel [| for t in tests -> t.Run |] (fun results ->
printResults results
match results.Failure with
| Some _ -> Interlocked.Increment(&failed.contents)
| None -> Interlocked.Increment(&passed.contents)
|> ignore)
printfn "DONE: %i FAILED: %i PASSED: %i"
(!failed + !passed) !failed !passed
let createTest name f =
{
Name = name
Run =
async {
let watch = Stopwatch.StartNew()
try
do! f
do watch.Stop()
return { Name = name; Failure = None; Time = watch.Elapsed }
with exn ->
do watch.Stop()
return { Name = name; Failure = Some exn; Time = watch.Elapsed }
}
}
let test name f =
async { return f () }
|> createTest name
startTests [
test "addition" (fun () -> 1 = 1 |> ignore)
test "multiplication" (fun () -> 1 * 1 |> ignore)
createTest "division" <|
async {
do! Async.Sleep(1000)
return 1 / 0 |> ignore
}
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment