Skip to content

Instantly share code, notes, and snippets.

@t0yv0
Created July 26, 2010 16:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save t0yv0/490816 to your computer and use it in GitHub Desktop.
Save t0yv0/490816 to your computer and use it in GitHub Desktop.
open System.Collections.Generic
[<Sealed>]
type Channel<'T>() =
let sync = obj ()
let readers = Queue<Future<_>>()
let writers = Queue<_>()
member this.CanRead : bool =
lock sync <| fun () -> writers.Count > 0
member this.CanWrite : bool =
lock sync <| fun () -> readers.Count > 0
member this.TryRead() : option<'T> =
lock sync <| fun () ->
if writers.Count = 0 then None else
Some (writers.Dequeue()())
member this.TryWrite(value: 'T) : bool =
lock sync <| fun () ->
if readers.Count = 0 then false else
readers.Dequeue().Provide value
true
member this.Read() : Async<'T> =
lock sync <| fun () ->
if writers.Count = 0 then
let f = Future<_>()
readers.Enqueue f
f.AsyncValue
else
async.Return(writers.Dequeue()())
member this.Write(value: 'T) : Async<unit> =
lock sync <| fun () ->
if readers.Count = 0 then
let f = Future<_>()
writers.Enqueue(fun () -> f.Provide(); value)
f.AsyncValue
else
readers.Dequeue().Provide(value)
async.Return()
member this.Send(value: 'T) : unit =
lock sync <| fun () ->
if readers.Count = 0 then
writers.Enqueue(fun () -> value)
else
readers.Dequeue().Provide(value)
open System.Collections.Generic
[<Sealed>]
type Future<'T>() =
let sync = obj ()
let mutable state = Waiting (Queue<_>())
let async =
Async.FromContinuations <| fun (ok, _, _) ->
lock sync <| fun () ->
match state with
| Ready x -> ok x
| Waiting queue -> queue.Enqueue ok
member this.Provide(value: 'T) =
lock sync <| fun () ->
match state with
| Ready x ->
failwith "Future.Provide called twice."
| Waiting queue ->
state <- Ready value
for f in queue do
f value
member this.CurrentValue =
lock sync <| fun () ->
match state with
| Ready x -> Some x
| _ -> None
member this.AsyncValue = async
and private FutureState<'T> =
| Waiting of Queue<'T -> unit>
| Ready of 'T
module MapReduce =
let MapReduce<'T1,'T2> (map: 'T1 -> 'T2)
(reduce: 'T2 -> 'T2 -> 'T2)
(inputs: 'T1 []) =
let chan = new Channel<'T2>()
let rec loop n =
if n = 1 then
chan.Read()
else
async {
let! x = chan.Read()
let! y = chan.Read()
do async { return chan.Send (reduce x y) }
|> Async.Start
return! loop (n - 1)
}
async {
for x in inputs do
async { return chan.Send (map x) }
|> Async.Start
return! loop inputs.Length
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment