Skip to content

Instantly share code, notes, and snippets.

@tebeco
Last active September 15, 2022 18:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tebeco/cb38900b18e5e0920d4f37fbdd8c23a5 to your computer and use it in GitHub Desktop.
Save tebeco/cb38900b18e5e0920d4f37fbdd8c23a5 to your computer and use it in GitHub Desktop.
UDP
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net7.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="System.IO.Pipelines" Version="7.0.0-*" />
</ItemGroup>
</Project>
using System.Buffers;
using System.IO.Pipelines;
using System.Net;
using System.Net.Sockets;
using System.Text;
// This constructor arbitrarily assigns the local port number.
var udpClient = new UdpClient();
udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
var anyEndpoint = new IPEndPoint(IPAddress.Any, 11000);
udpClient.Client.Bind(anyEndpoint);
var pipe = new Pipe();
Task writing = FillPipeAsync(udpClient, anyEndpoint, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
async Task FillPipeAsync(UdpClient udpClient, EndPoint endpoint, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
var memory = writer.GetMemory(minimumBufferSize);
var receiveResult = await udpClient.Client.ReceiveMessageFromAsync(memory, endpoint);
var bytesRead = receiveResult.ReceivedBytes;
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
var result = await reader.ReadAsync();
var buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
void ProcessLine(ReadOnlySequence<byte> payload)
{
var line = Encoding.UTF8.GetString(payload);
Console.WriteLine($"[RECEIVED] {line}");
}
using System.Buffers.Text;
using System.IO.Pipelines;
using System.Net.Sockets;
using System.Text;
const string messageHeader = "frame ";
const string messageSeparator = "\n";
int messageHeaderLength = messageHeader.Length * sizeof(char);
int messageLength = messageHeaderLength + int.MaxValue.ToString().Length + (sizeof(char) * messageSeparator.Length);
// This constructor arbitrarily assigns the local port number.
var udpClient = new UdpClient();
udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
udpClient.Connect("localhost", 11000);
var pipe = new Pipe();
Task writing = FillPipeAsync(udpClient, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
async Task FillPipeAsync(UdpClient udpClient, PipeWriter writer)
{
var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
var i = 0;
while (await timer.WaitForNextTickAsync())
{
++i;
var memory = writer.GetMemory(messageLength);
Encoding.UTF8.GetBytes(messageHeader, memory.Span);
Utf8Formatter.TryFormat(i, memory.Span.Slice(messageHeaderLength), out var charsWritten);
Encoding.UTF8.GetBytes(messageSeparator, memory.Span.Slice(messageHeaderLength + charsWritten));
var writtenInPipe = messageHeaderLength + charsWritten + sizeof(char);
Console.WriteLine($"[SENT TO PIPE WRITER] {writtenInPipe} bytes");
writer.Advance(writtenInPipe);
var result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
udpClient.Close();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
var result = await reader.ReadAsync();
foreach (var buffer in result.Buffer)
{
Console.WriteLine($"[RECEIVED FROM PIPE READER] {buffer.Length} bytes");
var remainingToSend = buffer;
do
{
var written = await udpClient.Client.SendAsync(remainingToSend);
remainingToSend = remainingToSend.Slice(written);
} while (!remainingToSend.IsEmpty);
}
reader.AdvanceTo(result.Buffer.End);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment