Created
July 26, 2010 16:45
-
-
Save t0yv0/490816 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open System.Collections.Generic | |
[<Sealed>] | |
type Channel<'T>() = | |
let sync = obj () | |
let readers = Queue<Future<_>>() | |
let writers = Queue<_>() | |
member this.CanRead : bool = | |
lock sync <| fun () -> writers.Count > 0 | |
member this.CanWrite : bool = | |
lock sync <| fun () -> readers.Count > 0 | |
member this.TryRead() : option<'T> = | |
lock sync <| fun () -> | |
if writers.Count = 0 then None else | |
Some (writers.Dequeue()()) | |
member this.TryWrite(value: 'T) : bool = | |
lock sync <| fun () -> | |
if readers.Count = 0 then false else | |
readers.Dequeue().Provide value | |
true | |
member this.Read() : Async<'T> = | |
lock sync <| fun () -> | |
if writers.Count = 0 then | |
let f = Future<_>() | |
readers.Enqueue f | |
f.AsyncValue | |
else | |
async.Return(writers.Dequeue()()) | |
member this.Write(value: 'T) : Async<unit> = | |
lock sync <| fun () -> | |
if readers.Count = 0 then | |
let f = Future<_>() | |
writers.Enqueue(fun () -> f.Provide(); value) | |
f.AsyncValue | |
else | |
readers.Dequeue().Provide(value) | |
async.Return() | |
member this.Send(value: 'T) : unit = | |
lock sync <| fun () -> | |
if readers.Count = 0 then | |
writers.Enqueue(fun () -> value) | |
else | |
readers.Dequeue().Provide(value) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open System.Collections.Generic | |
[<Sealed>] | |
type Future<'T>() = | |
let sync = obj () | |
let mutable state = Waiting (Queue<_>()) | |
let async = | |
Async.FromContinuations <| fun (ok, _, _) -> | |
lock sync <| fun () -> | |
match state with | |
| Ready x -> ok x | |
| Waiting queue -> queue.Enqueue ok | |
member this.Provide(value: 'T) = | |
lock sync <| fun () -> | |
match state with | |
| Ready x -> | |
failwith "Future.Provide called twice." | |
| Waiting queue -> | |
state <- Ready value | |
for f in queue do | |
f value | |
member this.CurrentValue = | |
lock sync <| fun () -> | |
match state with | |
| Ready x -> Some x | |
| _ -> None | |
member this.AsyncValue = async | |
and private FutureState<'T> = | |
| Waiting of Queue<'T -> unit> | |
| Ready of 'T |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module MapReduce = | |
let MapReduce<'T1,'T2> (map: 'T1 -> 'T2) | |
(reduce: 'T2 -> 'T2 -> 'T2) | |
(inputs: 'T1 []) = | |
let chan = new Channel<'T2>() | |
let rec loop n = | |
if n = 1 then | |
chan.Read() | |
else | |
async { | |
let! x = chan.Read() | |
let! y = chan.Read() | |
do async { return chan.Send (reduce x y) } | |
|> Async.Start | |
return! loop (n - 1) | |
} | |
async { | |
for x in inputs do | |
async { return chan.Send (map x) } | |
|> Async.Start | |
return! loop inputs.Length | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment