Created
December 27, 2016 18:59
-
-
Save paulbatum/6141d96fb01fd241b919804e814c0296 to your computer and use it in GitHub Desktop.
Functions parallelism
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Net; | |
using System.Threading.Tasks.Dataflow; | |
public enum Mode | |
{ | |
Sequential, | |
Batches, | |
Dataflow | |
} | |
public static async Task<HttpResponseMessage> Run(HttpRequestMessage req, IAsyncCollector<string> queueoutput, TraceWriter log) | |
{ | |
Mode mode = (Mode) Enum.Parse(typeof(Mode), req.GetQueryNameValuePairs() | |
.FirstOrDefault(q => string.Compare(q.Key, "mode", true) == 0) | |
.Value ?? "Sequential"); | |
int parallelism = int.Parse(req.GetQueryNameValuePairs() | |
.FirstOrDefault(q => string.Compare(q.Key, "parallelism", true) == 0) | |
.Value ?? "32"); | |
int count = int.Parse(req.GetQueryNameValuePairs() | |
.FirstOrDefault(q => string.Compare(q.Key, "count", true) == 0) | |
.Value ?? "10"); | |
log.Info($"Inserting {count} items(s). Using mode: {mode}, parallelism: {parallelism}."); | |
var items = Enumerable.Range(1, count).Select(i => i.ToString()); | |
if(mode == Mode.Dataflow) | |
{ | |
var addMessageBlock = new ActionBlock<string>(async message => | |
{ | |
await queueoutput.AddAsync(message); | |
}, new ExecutionDataflowBlockOptions { SingleProducerConstrained = true, MaxDegreeOfParallelism = parallelism }); | |
var bufferBlock = new BufferBlock<string>(); | |
bufferBlock.LinkTo(addMessageBlock, new DataflowLinkOptions { PropagateCompletion = true }); | |
foreach(var i in items) | |
bufferBlock.Post(i); | |
bufferBlock.Complete(); | |
await addMessageBlock.Completion; | |
} | |
else if (mode == Mode.Batches) | |
{ | |
while(items.Any()) | |
{ | |
var batch = items.Take(parallelism); | |
items = items.Skip(parallelism); | |
await Task.WhenAll(batch.Select(i => queueoutput.AddAsync(i))); | |
} | |
} | |
else if (mode == Mode.Sequential) | |
{ | |
foreach(var i in items) | |
await queueoutput.AddAsync(i); | |
} | |
else | |
{ | |
throw new Exception("unrecognized mode"); | |
} | |
return req.CreateResponse(HttpStatusCode.OK); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment