Skip to content

Instantly share code, notes, and snippets.

@NickLarsen
Created October 16, 2019 18:22
Show Gist options
  • Save NickLarsen/d7286c3aa631e5f1eaf459976631e765 to your computer and use it in GitHub Desktop.
Save NickLarsen/d7286c3aa631e5f1eaf459976631e765 to your computer and use it in GitHub Desktop.
async Task Main()
{
//Environment.Version.Dump();
Util.CreateSynchronizationContext();
// 3ms to 35ms
var times = Enumerable.Range(0, 1_000)
.Select(m =>
{
var randNorm = RandomNormal(10d, 10d);
return Math.Max(randNorm, 3d);
})
.ToArray();
times.Sum().Dump();
// the timing function appears to be good to within about 0.25 ms on average
// var actuals = times
// .AsParallel()
// .Select(async expected => new { expected, actual = await JustWait(expected).ConfigureAwait(false) })
// .ToArray();
// await Task.WhenAll(actuals);
//
// actuals.Select(m => m.Result.actual - m.Result.expected).Average().Dump("Average deviance");
// actuals.Dump();
const int bufferSize = 10;
Stopwatch timer;
timer = Stopwatch.StartNew();
await PipeContinue(times, bufferSize, JustWait);
timer.Stop();
timer.Elapsed.Dump(nameof(PipeContinue));
timer.Elapsed.TotalMilliseconds.Dump(nameof(PipeContinue));
timer = Stopwatch.StartNew();
await PipeContinueBuffer(times, bufferSize, JustWait);
timer.Stop();
timer.Elapsed.Dump(nameof(PipeContinueBuffer));
timer.Elapsed.TotalMilliseconds.Dump(nameof(PipeContinueBuffer));
timer = Stopwatch.StartNew();
await Pipe(times, bufferSize, JustWait);
timer.Stop();
timer.Elapsed.Dump(nameof(Pipe));
timer.Elapsed.TotalMilliseconds.Dump(nameof(Pipe));
}
async Task<double> JustWait(double ms)
{
var sw = Stopwatch.StartNew();
await Task.Delay((int)ms);
return sw.Elapsed.TotalMilliseconds;
// while (true)
// {
// var elapsed = sw.Elapsed.TotalMilliseconds;
// if (elapsed >= ms) return Task.FromResult(elapsed);
// }
}
Random rand = new Random(20191016); //reuse this if you are generating many
double RandomNormal(double mean, double stdDev)
{
// https://stackoverflow.com/a/218600
double u1 = 1.0 - rand.NextDouble(); //uniform(0,1] random doubles
double u2 = 1.0 - rand.NextDouble();
double randStdNormal = Math.Sqrt(-2.0 * Math.Log(u1)) * Math.Sin(2.0 * Math.PI * u2); //random normal(0,1)
double randNormal = mean + stdDev * randStdNormal; //random normal(mean,stdDev^2)
return randNormal;
}
// Define other methods and classes here
static async Task Pipe<T>(IEnumerable<T> source, int pipeLength, Func<T, Task> action)
{
var buffer = new Task[pipeLength];
uint index = 0;
foreach (var item in source)
{
//Console.WriteLine("task starting");
var newTask = action(item);
if (newTask.IsCompleted)
{ // completed synchronously; check for error and move on
newTask.Wait();
}
else
{
var thisIndex = index++ % pipeLength;
var oldTask = buffer[thisIndex];
if (oldTask != null) await oldTask;
buffer[thisIndex] = newTask;
}
}
for (int i = 0; i < pipeLength; i++)
{
var oldTask = buffer[(i + index) % pipeLength];
if (oldTask != null) await oldTask;
}
}
static Task PipeContinue<T>(IEnumerable<T> source, int maxUncompletedTasks, Func<T, Task> action)
{
//long processed = 0;
//var nextItems = new AnyNullable<T>[maxUncompletedTasks]
var queue = source.GetEnumerator();
async Task TakeWork()
{
while (true)
{
T item;
lock (queue)
{
if (!queue.MoveNext()) break;
item = queue.Current;
}
await action(item).ConfigureAwait(false);
}
Thread.CurrentThread.ManagedThreadId.Dump();
}
var workers = Enumerable.Range(0, maxUncompletedTasks)
.Select(_ => TakeWork())
.ToArray();
return Task.WhenAll(workers);
}
static Task PipeContinueBuffer<T>(IEnumerable<T> source, int maxUncompletedTasks, Func<T, Task> action)
{
//long processed = 0;
//var nextItems = new AnyNullable<T>[maxUncompletedTasks]
var queue = source.GetEnumerator();
async Task TakeWork()
{
const int bufferSize = 100;
var buffer = new T[bufferSize];
while (true)
{
int i;
lock (queue)
{
for (i = 0; i < bufferSize; i++)
{
if (!queue.MoveNext()) break;
buffer[i] = queue.Current;
}
}
for (int j = 0; j < i; j++)
{
await action(buffer[j]).ConfigureAwait(false);
}
if (i < bufferSize) break;
}
Thread.CurrentThread.ManagedThreadId.Dump();
}
var workers = Enumerable.Range(0, maxUncompletedTasks)
.Select(_ => TakeWork())
.ToArray();
return Task.WhenAll(workers);
}
struct AnyNullable<T>
{
public bool HasValue { get; set; }
public T Value { get; set; }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment