Skip to content

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
let mapReduce (map : 'T1 -> Async<'T2>)
(reduce : 'T2 -> 'T2 -> Async<'T2>)
(input : seq<'T1>) : Async<'T2> =
let run (a: Async<'T>) (k: 'T -> unit) =
Async.StartWithContinuations(a, k, ignore, ignore)
Async.FromContinuations <| fun (ok, _, _) ->
let k = ref 0
let agent =
new MailboxProcessor<_>(fun chan ->
async {
for i in 2 .. k.Value do
let! x = chan.Receive()
let! y = chan.Receive()
return run (reduce x y) chan.Post
let! r = chan.Receive()
return ok r
})
k :=
(0, input)
||> Seq.fold (fun count x ->
run (map x) agent.Post
count + 1)
agent.Start()
let mapReduce' (mapF : 'T1 -> Async<'T2>)
(reduceF : 'T2 -> 'T2 -> Async<'T2>)
(input : seq<'T1>) : Async<'T2> =
let input = Array.ofSeq input
let (<||>) first second =
async { let! results = Async.Parallel([|first; second|]) in return (results.[0], results.[1]) }
let rec mapReduce' s e =
async {
if s + 1 >= e then return! mapF input.[s]
else
let m = (s + e) / 2
let! (left, right) = mapReduce' s m <||> mapReduce' m e
return! reduceF left right
}
mapReduce' 0 input.Length
let test mapReduce n =
let map x = async { if x = n then
do! Async.Sleep 10000
return x }
let reduce x y = async { do! Async.Sleep 1000
return x + y }
mapReduce map reduce [| 0 .. n |]
|> Async.RunSynchronously
#time
test mapReduce 100
test mapReduce' 100
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.