Skip to content

Instantly share code, notes, and snippets.

@AlgorithmsAreCool
Last active December 20, 2020 03:02
Show Gist options
  • Save AlgorithmsAreCool/3a122f12493687a06ebc6ab2b5385d2b to your computer and use it in GitHub Desktop.
Save AlgorithmsAreCool/3a122f12493687a06ebc6ab2b5385d2b to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace BatchedAwait
{
public static class EntryPoint
{
static async Task Main(string[] args)
{
Console.WriteLine("Hello World!");
var tester = new Tester(10_000_000);
await tester.Startup();
}
}
public record Tester(int TestMessageCount)
{
private Stopwatch stopwatch { get; } = new();
private TaskCompletionSource FinalGate { get; } = new();
private int RequestsInFlight = 0;
private int CompletedCount = 0;
public record WorkUnit(BatchManager BatchManager, RequestItem Request);
public Task Startup()
{
Action<WorkUnit> cachedDelegate = arg => Work(arg);
var batchManager = new BatchManager();
stopwatch.Start();
for (int i = 0; i < TestMessageCount; i++) {
var req = new RequestItem(i);
var workUnit = new WorkUnit(batchManager, req);
Interlocked.Increment(ref RequestsInFlight);
ThreadPool.QueueUserWorkItem(cachedDelegate, workUnit, false);
}
return FinalGate.Task;
}
void Work(WorkUnit workUnit)
{
var batchManager = workUnit.BatchManager;
var requestItem = workUnit.Request;
var response = batchManager.SendSpecificRequest(requestItem);
var awaiter = response.GetAwaiter();
if (awaiter.IsCompleted) {
Next();
} else {
awaiter.OnCompleted(() => Next());
}
}
private void Next()
{
Interlocked.Increment(ref CompletedCount);
if (Interlocked.Decrement(ref RequestsInFlight) == 0)
FinalReport();
}
private void FinalReport()
{
var elapsed = stopwatch.Elapsed;
Console.WriteLine($"Completed {CompletedCount:N0} Test Requests");
Console.WriteLine($"Test took {elapsed.TotalSeconds:N3} Seconds {TestMessageCount / elapsed.TotalSeconds:N2} Per Second");
FinalGate.SetResult();
}
}
//
// Data Model
//
public record RequestItem(int Id);
public record ResponseItem(int Id);
public class BatchResponse
{
//Part of the magic is correlating the responses to the requests
public Dictionary<RequestItem, ResponseItem> Items { get; } = new();
}
//
// The core
//
public class BatchManager : IDisposable
{
public BatchManager()
{
_ = SendAndRotateBatchIfReady();
}
private readonly object Lck = new();
private CancellationTokenSource Cts { get; } = new();
private Batch CurrentBatch { get; set; } = new();
public async Task<ResponseItem> SendSpecificRequest(RequestItem request)
{
Batch localCopy;
lock (Lck) {
CurrentBatch.Enqueue(request);
//we need to copy the batch object local
//incase batch fires and rotates once we
//release the lock;
localCopy = CurrentBatch;
}
var batch = await localCopy.Result;
return batch.Items[request];
}
private async Task SendAndRotateBatchIfReady()
{
while (!Cts.IsCancellationRequested) {
await Task.Delay(1000);
lock (Lck) {
if (CurrentBatch.Count > 0) {
CurrentBatch.Fire();
CurrentBatch = new Batch();
}
}
}
}
public void Dispose() => Cts.Cancel();
}
public class Batch
{
public Batch()
{
Result = SendBatch();
}
public Task<BatchResponse> Result { get; }
private TaskCompletionSource FireCompletionSource { get; } = new();
private List<RequestItem> OutgoingRequests { get; } = new();
public int Count => OutgoingRequests.Count;
public void Enqueue(RequestItem request) => OutgoingRequests.Add(request);
public void Fire() => FireCompletionSource.SetResult();
private async Task<BatchResponse> SendBatch()
{
await FireCompletionSource.Task;//wait until we are told to go
Console.WriteLine($"Sending a batch of {OutgoingRequests.Count} requests to server...");
return await GetResponseFromServer(OutgoingRequests);
}
private static async Task<BatchResponse> GetResponseFromServer(IReadOnlyList<RequestItem> requests)
{
//fake delay simulating processing time
await Task.Delay(1_000);
var responseBatch = new BatchResponse();
foreach (var req in requests) {
responseBatch.Items.Add(req, new ResponseItem(req.Id));
}
return responseBatch;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment