Skip to content

Instantly share code, notes, and snippets.

@sgoguen
Created January 11, 2017 21:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sgoguen/e7c720dc81a3e360695e3e0f65071519 to your computer and use it in GitHub Desktop.
Save sgoguen/e7c720dc81a3e360695e3e0f65071519 to your computer and use it in GitHub Desktop.
void Main() {
var doWork = CreatePipeLineFunc(async (int x) => {
var delay = ((x % 3) + 1) * 1000;
await Task.Delay(delay);
return new { x, delay };
}, maxDegreeOfParallelism: 10);
Enumerable.Range(1, 100).Select(m => doWork(m)).Dump();
}
/// Turns an async function into a function that is serialized by the
Func<T, Task<U>> CreatePipeLineFunc<T, U>(Func<T, Task<U>> f, int maxDegreeOfParallelism = 1) {
var pipeLine = new ActionBlock<Task<Task<U>>>(async trigger => {
trigger.Start();
var task = await trigger;
await task;
}, new ExecutionDataflowBlockOptions {
BoundedCapacity = maxDegreeOfParallelism,
MaxDegreeOfParallelism = maxDegreeOfParallelism
});
return async x => {
// Create the trigger to
U result = default(U);
Task<U> taskToRun = null;
var trigger = new Task<Task<U>>(() => {
taskToRun = f(x);
return taskToRun;
});
await pipeLine.SendAsync(trigger);
await trigger;
return await taskToRun;
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment