Skip to content

Instantly share code, notes, and snippets.

@AceHack
Created September 26, 2018 22:42
Show Gist options
  • Save AceHack/786e558f14b2d9c0d46a1ea330c158f7 to your computer and use it in GitHub Desktop.
Save AceHack/786e558f14b2d9c0d46a1ea330c158f7 to your computer and use it in GitHub Desktop.
Pipelines Client/Server Test
using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace PipelinesTest
{
static class Program
{
const int _padding = 30;
readonly static TimeSpan _duration = TimeSpan.FromSeconds(60);
readonly static TaskCompletionSource<bool> _tcs = new TaskCompletionSource<bool>();
static long Bytes = 0;
static long Produced = 0;
static long Consumed = 0;
static long Latency = 0;
static long Errors = 0;
const int _port = 5100;
const int _minimumBufferSize = 512;
static Pipe _clientPipe;
static Socket _listenSocket;
static Socket _connectSocket;
static Socket _acceptSocket;
static async Task Main(string[] args)
{
await InitializeTestAsync().ConfigureAwait(false);
var start = DateTime.UtcNow;
//Start display loop, don't wait
RunDisplayLoopAsync(start);
await RunTestAsync(start).ConfigureAwait(false);
await ShutdownTestAsync().ConfigureAwait(false);
Console.ReadKey(true);
}
static long SpeculativeLockFreeAddition(ref long field, long operand)
{
SpinWait spinWait = new SpinWait();
while (true)
{
long snapshot = field;
long calc = snapshot + operand;
if (Interlocked.CompareExchange(ref field, calc, snapshot) == snapshot) return calc;
spinWait.SpinOnce();
}
}
static async Task InitializeTestAsync()
{
_listenSocket = new Socket(SocketType.Stream, System.Net.Sockets.ProtocolType.Tcp);
_connectSocket = new Socket(SocketType.Stream, System.Net.Sockets.ProtocolType.Tcp);
_clientPipe = new Pipe(new PipeOptions(useSynchronizationContext: false, minimumSegmentSize: 65536));
var serverPipe = new Pipe(new PipeOptions(useSynchronizationContext: false, minimumSegmentSize: 65536));
_listenSocket.Bind(new IPEndPoint(IPAddress.Loopback, _port));
_listenSocket.Listen(120);
var acceptSocketTask = _listenSocket.AcceptAsync().ConfigureAwait(false);
await _connectSocket.ConnectAsync(IPAddress.Loopback, _port).ConfigureAwait(false);
_acceptSocket = await acceptSocketTask;
//Don't await these
WriteToServerPipeAsync(serverPipe.Writer);
ReadFromServerPipeAsync(serverPipe.Reader);
ReadFromClientPipeAsync(_clientPipe.Reader);
}
static async Task WriteToServerPipeAsync(PipeWriter writer)
{
while (true)
{
try
{
// Request a minimum of 512 bytes from the PipeWriter
Memory<byte> memory = writer.GetMemory(_minimumBufferSize);
int bytesRead = await _acceptSocket.ReceiveAsync(memory, SocketFlags.None).ConfigureAwait(false);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read
writer.Advance(bytesRead);
}
catch
{
break;
}
// Make the data available to the PipeReader
FlushResult result = await writer.FlushAsync().ConfigureAwait(false);
if (result.IsCompleted)
{
break;
}
}
// Signal to the reader that we're done writing
writer.Complete();
}
static async Task ReadFromServerPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync().ConfigureAwait(false);
ReadOnlySequence<byte> buffer = result.Buffer;
SequencePosition? position = null;
do
{
// Find the EOL
position = buffer.PositionOf((byte)'\n');
if (position != null)
{
var line = buffer.Slice(0, position.Value);
ProcessLine(line);
// This is equivalent to position + 1
var next = buffer.GetPosition(1, position.Value);
// Skip what we've already processed including \n
buffer = buffer.Slice(next);
}
}
while (position != null);
// We sliced the buffer until no more data could be processed
// Tell the PipeReader how much we consumed and how much we left to process
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
{
break;
}
}
reader.Complete();
}
static async Task ReadFromClientPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync().ConfigureAwait(false);
ReadOnlySequence<byte> buffer = result.Buffer;
foreach (var segment in buffer)
{
await _connectSocket.SendAsync(segment, SocketFlags.None).ConfigureAwait(false);
}
reader.AdvanceTo(buffer.End);
if (result.IsCompleted)
{
break;
}
}
reader.Complete();
}
static void ProcessLine(in ReadOnlySequence<byte> buffer)
{
var now = DateTime.UtcNow.Ticks;
var ts = long.Parse(Encoding.UTF8.GetString(buffer.ToArray()));
if (ts == 0)
{
_tcs.SetResult(true);
}
else
{
Interlocked.Increment(ref Consumed);
SpeculativeLockFreeAddition(ref Latency, now - ts);
}
}
static async Task RunTestAsync(DateTime start)
{
while (_duration > DateTime.UtcNow - start)
{
try
{
await RunOneIterationAsync().ConfigureAwait(false);
}
catch
{
Interlocked.Increment(ref Errors);
}
}
}
static async Task RunOneIterationAsync()
{
var data = Encoding.UTF8.GetBytes($"{DateTime.UtcNow.Ticks.ToString()}\n");
await _clientPipe.Writer.WriteAsync(data.AsMemory()).ConfigureAwait(false);
Interlocked.Increment(ref Produced);
SpeculativeLockFreeAddition(ref Bytes, data.Length);
}
static async Task ShutdownTestAsync()
{
await _clientPipe.Writer.WriteAsync(Encoding.UTF8.GetBytes($"{0}\n").AsMemory()).ConfigureAwait(false);
await _tcs.Task.ConfigureAwait(false);
_clientPipe.Writer.Complete();
_connectSocket.Dispose();
_listenSocket.Dispose();
}
static async Task RunDisplayLoopAsync(DateTime start)
{
Bytes = 0;
Produced = 0;
Consumed = 0;
Latency = 0;
Console.CursorVisible = false;
while (!_tcs.Task.IsCompleted)
{
WriteOutput(start);
await Task.Delay(100).ConfigureAwait(false);
}
WriteOutput(start);
}
static void WriteOutput(DateTime start)
{
var elapsed = DateTime.UtcNow - start;
Console.Clear();
Console.WriteLine("Elapsed: ".PadRight(_padding) + $"{elapsed:c}");
WriteMetric(nameof(Bytes), Bytes, elapsed);
WriteMetric(nameof(Produced), Produced, elapsed);
WriteMetric(nameof(Consumed), Consumed, elapsed, Latency);
WriteMetric(nameof(Errors), Errors, elapsed);
}
static void WriteMetric(string name, long metric, TimeSpan elapsed, long latency = 0)
{
if (metric > 0)
{
var totalSeconds = elapsed.TotalSeconds;
Console.WriteLine($"{name} (total): ".PadRight(_padding) + $"{metric:N0}");
if (totalSeconds > 0)
{
Console.WriteLine($"{name} (per second): ".PadRight(_padding) + $"{metric / totalSeconds:N0}");
}
if (latency > 0)
{
var latencyMs = new TimeSpan(latency).TotalMilliseconds;
Console.WriteLine($"Average Latency: ".PadRight(_padding) + $"{latencyMs / metric} ms");
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment