Create a gist now

Instantly share code, notes, and snippets.

let mapReduce (map : 'T1 -> Async<'T2>)
(reduce : 'T2 -> 'T2 -> Async<'T2>)
(input : seq<'T1>) : Async<'T2> =
let run (a: Async<'T>) (k: 'T -> unit) =
Async.StartWithContinuations(a, k, ignore, ignore)
Async.FromContinuations <| fun (ok, _, _) ->
let k = ref 0
let agent =
new MailboxProcessor<_>(fun chan ->
async {
for i in 2 .. k.Value do
let! x = chan.Receive()
let! y = chan.Receive()
return run (reduce x y) chan.Post
let! r = chan.Receive()
return ok r
})
k :=
(0, input)
||> Seq.fold (fun count x ->
run (map x) agent.Post
count + 1)
agent.Start()
let mapReduce' (mapF : 'T1 -> Async<'T2>)
(reduceF : 'T2 -> 'T2 -> Async<'T2>)
(input : seq<'T1>) : Async<'T2> =
let input = Array.ofSeq input
let (<||>) first second =
async { let! results = Async.Parallel([|first; second|]) in return (results.[0], results.[1]) }
let rec mapReduce' s e =
async {
if s + 1 >= e then return! mapF input.[s]
else
let m = (s + e) / 2
let! (left, right) = mapReduce' s m <||> mapReduce' m e
return! reduceF left right
}
mapReduce' 0 input.Length
let test mapReduce n =
let map x = async { if x = n then
do! Async.Sleep 10000
return x }
let reduce x y = async { do! Async.Sleep 1000
return x + y }
mapReduce map reduce [| 0 .. n |]
|> Async.RunSynchronously
#time
test mapReduce 100
test mapReduce' 100
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment