Skip to content

Instantly share code, notes, and snippets.

Created October 16, 2019 18:22
Show Gist options
  • Save NickLarsen/d7286c3aa631e5f1eaf459976631e765 to your computer and use it in GitHub Desktop.
Save NickLarsen/d7286c3aa631e5f1eaf459976631e765 to your computer and use it in GitHub Desktop.
async Task Main()
// 3ms to 35ms
var times = Enumerable.Range(0, 1_000)
.Select(m =>
var randNorm = RandomNormal(10d, 10d);
return Math.Max(randNorm, 3d);
// the timing function appears to be good to within about 0.25 ms on average
// var actuals = times
// .AsParallel()
// .Select(async expected => new { expected, actual = await JustWait(expected).ConfigureAwait(false) })
// .ToArray();
// await Task.WhenAll(actuals);
// actuals.Select(m => m.Result.actual - m.Result.expected).Average().Dump("Average deviance");
// actuals.Dump();
const int bufferSize = 10;
Stopwatch timer;
timer = Stopwatch.StartNew();
await PipeContinue(times, bufferSize, JustWait);
timer = Stopwatch.StartNew();
await PipeContinueBuffer(times, bufferSize, JustWait);
timer = Stopwatch.StartNew();
await Pipe(times, bufferSize, JustWait);
async Task<double> JustWait(double ms)
var sw = Stopwatch.StartNew();
await Task.Delay((int)ms);
return sw.Elapsed.TotalMilliseconds;
// while (true)
// {
// var elapsed = sw.Elapsed.TotalMilliseconds;
// if (elapsed >= ms) return Task.FromResult(elapsed);
// }
Random rand = new Random(20191016); //reuse this if you are generating many
double RandomNormal(double mean, double stdDev)
double u1 = 1.0 - rand.NextDouble(); //uniform(0,1] random doubles
double u2 = 1.0 - rand.NextDouble();
double randStdNormal = Math.Sqrt(-2.0 * Math.Log(u1)) * Math.Sin(2.0 * Math.PI * u2); //random normal(0,1)
double randNormal = mean + stdDev * randStdNormal; //random normal(mean,stdDev^2)
return randNormal;
// Define other methods and classes here
static async Task Pipe<T>(IEnumerable<T> source, int pipeLength, Func<T, Task> action)
var buffer = new Task[pipeLength];
uint index = 0;
foreach (var item in source)
//Console.WriteLine("task starting");
var newTask = action(item);
if (newTask.IsCompleted)
{ // completed synchronously; check for error and move on
var thisIndex = index++ % pipeLength;
var oldTask = buffer[thisIndex];
if (oldTask != null) await oldTask;
buffer[thisIndex] = newTask;
for (int i = 0; i < pipeLength; i++)
var oldTask = buffer[(i + index) % pipeLength];
if (oldTask != null) await oldTask;
static Task PipeContinue<T>(IEnumerable<T> source, int maxUncompletedTasks, Func<T, Task> action)
//long processed = 0;
//var nextItems = new AnyNullable<T>[maxUncompletedTasks]
var queue = source.GetEnumerator();
async Task TakeWork()
while (true)
T item;
lock (queue)
if (!queue.MoveNext()) break;
item = queue.Current;
await action(item).ConfigureAwait(false);
var workers = Enumerable.Range(0, maxUncompletedTasks)
.Select(_ => TakeWork())
return Task.WhenAll(workers);
static Task PipeContinueBuffer<T>(IEnumerable<T> source, int maxUncompletedTasks, Func<T, Task> action)
//long processed = 0;
//var nextItems = new AnyNullable<T>[maxUncompletedTasks]
var queue = source.GetEnumerator();
async Task TakeWork()
const int bufferSize = 100;
var buffer = new T[bufferSize];
while (true)
int i;
lock (queue)
for (i = 0; i < bufferSize; i++)
if (!queue.MoveNext()) break;
buffer[i] = queue.Current;
for (int j = 0; j < i; j++)
await action(buffer[j]).ConfigureAwait(false);
if (i < bufferSize) break;
var workers = Enumerable.Range(0, maxUncompletedTasks)
.Select(_ => TakeWork())
return Task.WhenAll(workers);
struct AnyNullable<T>
public bool HasValue { get; set; }
public T Value { get; set; }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment