Skip to content

Instantly share code, notes, and snippets.

@paulbatum
Created December 27, 2016 18:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save paulbatum/6141d96fb01fd241b919804e814c0296 to your computer and use it in GitHub Desktop.
Save paulbatum/6141d96fb01fd241b919804e814c0296 to your computer and use it in GitHub Desktop.
Functions parallelism
using System.Net;
using System.Threading.Tasks.Dataflow;
public enum Mode
{
Sequential,
Batches,
Dataflow
}
public static async Task<HttpResponseMessage> Run(HttpRequestMessage req, IAsyncCollector<string> queueoutput, TraceWriter log)
{
Mode mode = (Mode) Enum.Parse(typeof(Mode), req.GetQueryNameValuePairs()
.FirstOrDefault(q => string.Compare(q.Key, "mode", true) == 0)
.Value ?? "Sequential");
int parallelism = int.Parse(req.GetQueryNameValuePairs()
.FirstOrDefault(q => string.Compare(q.Key, "parallelism", true) == 0)
.Value ?? "32");
int count = int.Parse(req.GetQueryNameValuePairs()
.FirstOrDefault(q => string.Compare(q.Key, "count", true) == 0)
.Value ?? "10");
log.Info($"Inserting {count} items(s). Using mode: {mode}, parallelism: {parallelism}.");
var items = Enumerable.Range(1, count).Select(i => i.ToString());
if(mode == Mode.Dataflow)
{
var addMessageBlock = new ActionBlock<string>(async message =>
{
await queueoutput.AddAsync(message);
}, new ExecutionDataflowBlockOptions { SingleProducerConstrained = true, MaxDegreeOfParallelism = parallelism });
var bufferBlock = new BufferBlock<string>();
bufferBlock.LinkTo(addMessageBlock, new DataflowLinkOptions { PropagateCompletion = true });
foreach(var i in items)
bufferBlock.Post(i);
bufferBlock.Complete();
await addMessageBlock.Completion;
}
else if (mode == Mode.Batches)
{
while(items.Any())
{
var batch = items.Take(parallelism);
items = items.Skip(parallelism);
await Task.WhenAll(batch.Select(i => queueoutput.AddAsync(i)));
}
}
else if (mode == Mode.Sequential)
{
foreach(var i in items)
await queueoutput.AddAsync(i);
}
else
{
throw new Exception("unrecognized mode");
}
return req.CreateResponse(HttpStatusCode.OK);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment