public
Last active

  • Download Gist
MapReduce.fs
F#
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
let mapReduce (map : 'T1 -> Async<'T2>)
(reduce : 'T2 -> 'T2 -> Async<'T2>)
(input : seq<'T1>) : Async<'T2> =
let run (a: Async<'T>) (k: 'T -> unit) =
Async.StartWithContinuations(a, k, ignore, ignore)
Async.FromContinuations <| fun (ok, _, _) ->
let k = ref 0
let agent =
new MailboxProcessor<_>(fun chan ->
async {
for i in 2 .. k.Value do
let! x = chan.Receive()
let! y = chan.Receive()
return run (reduce x y) chan.Post
let! r = chan.Receive()
return ok r
})
k :=
(0, input)
||> Seq.fold (fun count x ->
run (map x) agent.Post
count + 1)
agent.Start()
 
let mapReduce' (mapF : 'T1 -> Async<'T2>)
(reduceF : 'T2 -> 'T2 -> Async<'T2>)
(input : seq<'T1>) : Async<'T2> =
let input = Array.ofSeq input
let (<||>) first second =
async { let! results = Async.Parallel([|first; second|]) in return (results.[0], results.[1]) }
let rec mapReduce' s e =
async {
if s + 1 >= e then return! mapF input.[s]
else
let m = (s + e) / 2
let! (left, right) = mapReduce' s m <||> mapReduce' m e
return! reduceF left right
}
mapReduce' 0 input.Length
 
let test mapReduce n =
let map x = async { if x = n then
do! Async.Sleep 10000
return x }
let reduce x y = async { do! Async.Sleep 1000
return x + y }
mapReduce map reduce [| 0 .. n |]
|> Async.RunSynchronously
 
#time
test mapReduce 100
test mapReduce' 100

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.