Skip to content

Instantly share code, notes, and snippets.

@mavnn
Created February 15, 2013 15:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mavnn/4961056 to your computer and use it in GitHub Desktop.
Save mavnn/4961056 to your computer and use it in GitHub Desktop.
Throttled parallel execution.
open System.Collections.Concurrent
type JobRequest<'T> =
{
Id : int
WorkItem : 'T
}
type WorkRequest<'T> =
| Job of JobRequest<'T>
| End
let inline doParallelWithThrottle<'a, 'b> limit f items =
let itemArray = Seq.toArray items
let itemCount = Array.length itemArray
let resultMap = ConcurrentDictionary<int, 'b>()
use block = new BlockingCollection<WorkRequest<'a>>(1)
use completeBlock = new BlockingCollection<unit>(1)
let monitor =
MailboxProcessor.Start(fun inbox ->
let rec inner complete =
async {
do! inbox.Receive()
if complete + 1 = limit then
completeBlock.Add(())
return ()
else
return! inner <| complete + 1
}
inner 0)
let createAgent () =
MailboxProcessor.Start(
fun inbox ->
let rec inner () = async {
let! request = async { return block.Take() }
match request with
| Job job ->
let! result = async { return f (job.WorkItem) }
resultMap.AddOrUpdate(job.Id, result, fun _ _ -> result) |> ignore
return! inner ()
| End ->
monitor.Post ()
return ()
}
inner ()
)
let agents =
[| for i in 1..limit -> createAgent() |]
itemArray
|> Array.mapi (fun i item -> Job { Id = i; WorkItem = item })
|> Array.iter (block.Add)
[1..limit]
|> Seq.iter (fun x -> block.Add(End))
completeBlock.Take()
let results = Array.zeroCreate itemCount
resultMap
|> Seq.iter (fun kv -> results.[kv.Key] <- kv.Value)
results
@polytypic
Copy link

First of all, consider what happens when f raises an exception.

Note that within an async

let! result = async { return f (job.WorkItem) }

is equivalent to

let result = f job.WorkItem

The same goes for

let! request = async { return block.Take() }

which is equivalent to

let request = block.Take ()

In other words, wrapping non-async code with async (unfortunately) does not make it behave asynchronously.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment