Skip to content

Instantly share code, notes, and snippets.

@alexyakunin
Last active July 31, 2019 23:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexyakunin/0e69ef719f378f24bfcc8ce6f052e532 to your computer and use it in GitHub Desktop.
Save alexyakunin/0e69ef719f378f24bfcc8ce6f052e532 to your computer and use it in GitHub Desktop.
public static async Task<long> ComputeSumAsync(string fileName,
Func<ReadOnlyMemory<byte>, long, int, (long, int)> sumComputer,
CancellationToken ct = default)
{
await using var fs = new FileStream(fileName, FileMode.Open);
var pipe = new Pipe(new PipeOptions(
minimumSegmentSize: MinBufferSize,
useSynchronizationContext: false));
async Task ProduceAsync() {
await fs.CopyToAsync(pipe.Writer, ct);
pipe.Writer.Complete();
}
async Task<long> ConsumeAsync() {
try {
var sum = 0L;
var n = 0;
var lastTimeProcessed = 0L;
var readTask = pipe.Reader.ReadAsync(ct);
while (true) {
var result = await readTask.ConfigureAwait(false);
var buffer = result.Buffer;
var toProcess = buffer.Slice(lastTimeProcessed);
lastTimeProcessed = toProcess.Length;
if (toProcess.IsEmpty && (result.IsCompleted || result.IsCanceled))
break;
pipe.Reader.AdvanceTo(toProcess.Start, toProcess.End);
readTask = pipe.Reader.ReadAsync(ct); // We can start it right now to read while compute
foreach (var segment in toProcess)
(sum, n) = sumComputer(segment.Slice(0), sum, n);
}
return sum + n;
}
finally {
pipe.Reader.Complete();
}
}
var (produceTask, consumeTask) = (ProduceAsync(), ConsumeAsync());
await Task.WhenAll(produceTask, consumeTask);
return consumeTask.Result;
}
private static unsafe (long, int) ComputeSumUnsafeUnrolled(
ReadOnlyMemory<byte> buffer, long sum, int n)
{
var span = buffer.Span;
fixed (byte* pStart = span) {
var pEnd = pStart + span.Length;
var pEnd16 = pEnd - 15;
var p = pStart;
while (p < pEnd16) {
// Loop
var b = p[0];
n = (b & 127) + (n << 7);
if ((b & 128) != 0) {
sum += n;
n = 0;
}
// Skipping loop repeats for p[1] ... p[14]
// Loop
b= p[15];
n = (b & 127) + (n << 7);
if ((b & 128) != 0) {
sum += n;
n = 0;
}
p += 16;
}
while (p < pEnd) {
var b = *p++;
n = (b & 127) + (n << 7);
if ((b & 128) != 0) {
sum += n;
n = 0;
}
}
}
return (sum, n);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment