Created
September 26, 2018 22:42
-
-
Save AceHack/786e558f14b2d9c0d46a1ea330c158f7 to your computer and use it in GitHub Desktop.
Pipelines Client/Server Test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Buffers; | |
using System.IO.Pipelines; | |
using System.Net; | |
using System.Net.Sockets; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace PipelinesTest | |
{ | |
static class Program | |
{ | |
const int _padding = 30; | |
readonly static TimeSpan _duration = TimeSpan.FromSeconds(60); | |
readonly static TaskCompletionSource<bool> _tcs = new TaskCompletionSource<bool>(); | |
static long Bytes = 0; | |
static long Produced = 0; | |
static long Consumed = 0; | |
static long Latency = 0; | |
static long Errors = 0; | |
const int _port = 5100; | |
const int _minimumBufferSize = 512; | |
static Pipe _clientPipe; | |
static Socket _listenSocket; | |
static Socket _connectSocket; | |
static Socket _acceptSocket; | |
static async Task Main(string[] args) | |
{ | |
await InitializeTestAsync().ConfigureAwait(false); | |
var start = DateTime.UtcNow; | |
//Start display loop, don't wait | |
RunDisplayLoopAsync(start); | |
await RunTestAsync(start).ConfigureAwait(false); | |
await ShutdownTestAsync().ConfigureAwait(false); | |
Console.ReadKey(true); | |
} | |
static long SpeculativeLockFreeAddition(ref long field, long operand) | |
{ | |
SpinWait spinWait = new SpinWait(); | |
while (true) | |
{ | |
long snapshot = field; | |
long calc = snapshot + operand; | |
if (Interlocked.CompareExchange(ref field, calc, snapshot) == snapshot) return calc; | |
spinWait.SpinOnce(); | |
} | |
} | |
static async Task InitializeTestAsync() | |
{ | |
_listenSocket = new Socket(SocketType.Stream, System.Net.Sockets.ProtocolType.Tcp); | |
_connectSocket = new Socket(SocketType.Stream, System.Net.Sockets.ProtocolType.Tcp); | |
_clientPipe = new Pipe(new PipeOptions(useSynchronizationContext: false, minimumSegmentSize: 65536)); | |
var serverPipe = new Pipe(new PipeOptions(useSynchronizationContext: false, minimumSegmentSize: 65536)); | |
_listenSocket.Bind(new IPEndPoint(IPAddress.Loopback, _port)); | |
_listenSocket.Listen(120); | |
var acceptSocketTask = _listenSocket.AcceptAsync().ConfigureAwait(false); | |
await _connectSocket.ConnectAsync(IPAddress.Loopback, _port).ConfigureAwait(false); | |
_acceptSocket = await acceptSocketTask; | |
//Don't await these | |
WriteToServerPipeAsync(serverPipe.Writer); | |
ReadFromServerPipeAsync(serverPipe.Reader); | |
ReadFromClientPipeAsync(_clientPipe.Reader); | |
} | |
static async Task WriteToServerPipeAsync(PipeWriter writer) | |
{ | |
while (true) | |
{ | |
try | |
{ | |
// Request a minimum of 512 bytes from the PipeWriter | |
Memory<byte> memory = writer.GetMemory(_minimumBufferSize); | |
int bytesRead = await _acceptSocket.ReceiveAsync(memory, SocketFlags.None).ConfigureAwait(false); | |
if (bytesRead == 0) | |
{ | |
break; | |
} | |
// Tell the PipeWriter how much was read | |
writer.Advance(bytesRead); | |
} | |
catch | |
{ | |
break; | |
} | |
// Make the data available to the PipeReader | |
FlushResult result = await writer.FlushAsync().ConfigureAwait(false); | |
if (result.IsCompleted) | |
{ | |
break; | |
} | |
} | |
// Signal to the reader that we're done writing | |
writer.Complete(); | |
} | |
static async Task ReadFromServerPipeAsync(PipeReader reader) | |
{ | |
while (true) | |
{ | |
ReadResult result = await reader.ReadAsync().ConfigureAwait(false); | |
ReadOnlySequence<byte> buffer = result.Buffer; | |
SequencePosition? position = null; | |
do | |
{ | |
// Find the EOL | |
position = buffer.PositionOf((byte)'\n'); | |
if (position != null) | |
{ | |
var line = buffer.Slice(0, position.Value); | |
ProcessLine(line); | |
// This is equivalent to position + 1 | |
var next = buffer.GetPosition(1, position.Value); | |
// Skip what we've already processed including \n | |
buffer = buffer.Slice(next); | |
} | |
} | |
while (position != null); | |
// We sliced the buffer until no more data could be processed | |
// Tell the PipeReader how much we consumed and how much we left to process | |
reader.AdvanceTo(buffer.Start, buffer.End); | |
if (result.IsCompleted) | |
{ | |
break; | |
} | |
} | |
reader.Complete(); | |
} | |
static async Task ReadFromClientPipeAsync(PipeReader reader) | |
{ | |
while (true) | |
{ | |
ReadResult result = await reader.ReadAsync().ConfigureAwait(false); | |
ReadOnlySequence<byte> buffer = result.Buffer; | |
foreach (var segment in buffer) | |
{ | |
await _connectSocket.SendAsync(segment, SocketFlags.None).ConfigureAwait(false); | |
} | |
reader.AdvanceTo(buffer.End); | |
if (result.IsCompleted) | |
{ | |
break; | |
} | |
} | |
reader.Complete(); | |
} | |
static void ProcessLine(in ReadOnlySequence<byte> buffer) | |
{ | |
var now = DateTime.UtcNow.Ticks; | |
var ts = long.Parse(Encoding.UTF8.GetString(buffer.ToArray())); | |
if (ts == 0) | |
{ | |
_tcs.SetResult(true); | |
} | |
else | |
{ | |
Interlocked.Increment(ref Consumed); | |
SpeculativeLockFreeAddition(ref Latency, now - ts); | |
} | |
} | |
static async Task RunTestAsync(DateTime start) | |
{ | |
while (_duration > DateTime.UtcNow - start) | |
{ | |
try | |
{ | |
await RunOneIterationAsync().ConfigureAwait(false); | |
} | |
catch | |
{ | |
Interlocked.Increment(ref Errors); | |
} | |
} | |
} | |
static async Task RunOneIterationAsync() | |
{ | |
var data = Encoding.UTF8.GetBytes($"{DateTime.UtcNow.Ticks.ToString()}\n"); | |
await _clientPipe.Writer.WriteAsync(data.AsMemory()).ConfigureAwait(false); | |
Interlocked.Increment(ref Produced); | |
SpeculativeLockFreeAddition(ref Bytes, data.Length); | |
} | |
static async Task ShutdownTestAsync() | |
{ | |
await _clientPipe.Writer.WriteAsync(Encoding.UTF8.GetBytes($"{0}\n").AsMemory()).ConfigureAwait(false); | |
await _tcs.Task.ConfigureAwait(false); | |
_clientPipe.Writer.Complete(); | |
_connectSocket.Dispose(); | |
_listenSocket.Dispose(); | |
} | |
static async Task RunDisplayLoopAsync(DateTime start) | |
{ | |
Bytes = 0; | |
Produced = 0; | |
Consumed = 0; | |
Latency = 0; | |
Console.CursorVisible = false; | |
while (!_tcs.Task.IsCompleted) | |
{ | |
WriteOutput(start); | |
await Task.Delay(100).ConfigureAwait(false); | |
} | |
WriteOutput(start); | |
} | |
static void WriteOutput(DateTime start) | |
{ | |
var elapsed = DateTime.UtcNow - start; | |
Console.Clear(); | |
Console.WriteLine("Elapsed: ".PadRight(_padding) + $"{elapsed:c}"); | |
WriteMetric(nameof(Bytes), Bytes, elapsed); | |
WriteMetric(nameof(Produced), Produced, elapsed); | |
WriteMetric(nameof(Consumed), Consumed, elapsed, Latency); | |
WriteMetric(nameof(Errors), Errors, elapsed); | |
} | |
static void WriteMetric(string name, long metric, TimeSpan elapsed, long latency = 0) | |
{ | |
if (metric > 0) | |
{ | |
var totalSeconds = elapsed.TotalSeconds; | |
Console.WriteLine($"{name} (total): ".PadRight(_padding) + $"{metric:N0}"); | |
if (totalSeconds > 0) | |
{ | |
Console.WriteLine($"{name} (per second): ".PadRight(_padding) + $"{metric / totalSeconds:N0}"); | |
} | |
if (latency > 0) | |
{ | |
var latencyMs = new TimeSpan(latency).TotalMilliseconds; | |
Console.WriteLine($"Average Latency: ".PadRight(_padding) + $"{latencyMs / metric} ms"); | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment