Skip to content

Instantly share code, notes, and snippets.

@namse
Created April 12, 2019 13:46
Show Gist options
  • Save namse/b26f7317cf63e5d63027e20ddefd444f to your computer and use it in GitHub Desktop.
Save namse/b26f7317cf63e5d63027e20ddefd444f to your computer and use it in GitHub Desktop.
BaseNetworkConnector using System.IO.Pipelines
using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using MessageSerializer;
namespace NetworkConnector.Connector
{
public abstract class BaseNetworkConnector
{
private readonly BaseMessageSerializer _messageSerializer;
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private readonly AsyncConcurrentQueue<object> _messageQueue = new AsyncConcurrentQueue<object>();
private static readonly ConcurrentDictionary<string, Type> TypeCaches = new ConcurrentDictionary<string, Type>();
public bool IsRunning { get; private set; }
public abstract bool IsConnected { get; }
protected BaseNetworkConnector(BaseMessageSerializer messageSerializer)
{
_messageSerializer = messageSerializer;
}
public void Start()
{
CheckIsConnectedOrThrowException();
var pipe = new Pipe();
var writing = Task.Run(async () =>
{
try
{
await FillPipeAsync(pipe.Writer, _cancellationTokenSource.Token);
if (!_cancellationTokenSource.IsCancellationRequested)
{
_cancellationTokenSource.Cancel();
}
}
catch (Exception exception)
{
Console.WriteLine(exception);
}
});
var reading = Task.Run(async () =>
{
try
{
await ReadPipeAsync(pipe.Reader, _cancellationTokenSource.Token);
if (!_cancellationTokenSource.IsCancellationRequested)
{
_cancellationTokenSource.Cancel();
}
}
catch (Exception exception)
{
Console.WriteLine(exception);
}
});
IsRunning = true;
}
public void Stop()
{
_cancellationTokenSource.Cancel();
IsRunning = false;
}
protected void CheckIsConnectedOrThrowException()
{
if (!IsConnected)
{
throw new Exception("NetworkConnector isn't connected yet. Call Start() first");
}
}
protected void CheckIsRunningOrThrowException()
{
if (!IsRunning)
{
throw new Exception("NetworkConnector isn't started yet. Call ConnectAsync() first");
}
}
public abstract ValueTask ConnectAsync(CancellationToken cancellationToken);
public async Task SendMessageAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
{
CheckIsRunningOrThrowException();
var bodyBuffer = _messageSerializer.Serialize(message);
var header = new Header
{
BodySize = bodyBuffer.Length,
TypeName = typeof(TMessage).FullName,
};
var headerBuffer = header.ToMemory();
await SendPacketAsync(headerBuffer, cancellationToken);
await SendPacketAsync(bodyBuffer, cancellationToken);
}
public async Task<TMessage> ReceiveMessageAsync<TMessage>()
{
var message = await _messageQueue.DequeueAsync(_cancellationTokenSource.Token);
return (TMessage)message;
}
protected abstract Task SendPacketAsync(ReadOnlyMemory<byte> packet, CancellationToken cancellationToken);
protected abstract Task<int> ReceivePacketAsync(Memory<byte> memory, CancellationToken cancellationToken);
private async Task FillPipeAsync(PipeWriter pipeWriter, CancellationToken cancellationToken)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter
var memory = pipeWriter.GetMemory(minimumBufferSize);
try
{
var bytesReceived = await ReceivePacketAsync(memory, cancellationToken);
if (bytesReceived == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket
pipeWriter.Advance(bytesReceived);
}
catch(Exception exception)
{
Console.WriteLine(exception);
break;
}
// Make the data available to the PipeReader
var flushResult = await pipeWriter.FlushAsync(cancellationToken);
if (flushResult.IsCompleted)
{
break;
}
}
// Tell the PipeReader that there's no more data coming
pipeWriter.Complete();
}
private async Task ReadPipeAsync(PipeReader pipeReader, CancellationToken cancellationToken)
{
Header? header = null;
while (true)
{
var result = await pipeReader.ReadAsync(cancellationToken);
var buffer = result.Buffer;
// How to read message?
// Requirement : We have to know where end of message in this stream.
// -> Put header when write message!
SequencePosition? position = null;
while (true)
{
// Read Header
if (header is null)
{
if (!Header.TryParseHeader(buffer, out header) || header is null)
{
break;
}
position = position is null
? buffer.GetPosition(header.Value.HeaderSize)
: buffer.GetPosition(header.Value.HeaderSize, position.Value);
buffer = buffer.Slice(position.Value);
}
// Read Body
var bodyBufferSource = ArrayPool<byte>.Shared.Rent(header.Value.BodySize);
var bodyBufferMemory = bodyBufferSource.AsMemory(0, header.Value.BodySize);
var isCopied = TryCopyBodyBuffer(
buffer,
header.Value.BodySize,
bodyBufferMemory.Span);
if (!isCopied)
{
ArrayPool<byte>.Shared.Return(bodyBufferSource);
break;
}
position = position is null
? buffer.GetPosition(header.Value.BodySize)
: buffer.GetPosition(header.Value.BodySize, position.Value);
buffer = buffer.Slice(position.Value);
var copyedHeader = header;
_ = Task.Run(() =>
{
var type = GetType(copyedHeader.Value.TypeName);
var message = _messageSerializer.Deserialize(bodyBufferMemory, type);
_messageQueue.Enqueue(message);
ArrayPool<byte>.Shared.Return(bodyBufferSource);
}, cancellationToken);
// Clear
header = null;
}
pipeReader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete
pipeReader.Complete();
}
private static Type? GetType(string typeName)
{
if (TypeCaches.TryGetValue(typeName, out var cachedType))
{
return cachedType;
}
var type = Type.GetType(typeName);
if (!(type is null))
{
TypeCaches.TryAdd(typeName, type);
return type;
}
foreach (var assembly in AppDomain.CurrentDomain.GetAssemblies())
{
type = assembly.GetType(typeName);
if (!(type is null))
{
TypeCaches.TryAdd(typeName, type);
return type;
}
}
return null;
}
private bool TryCopyBodyBuffer(in ReadOnlySequence<byte> buffer, int bodySize, Span<byte> destination)
{
if (buffer.Length < bodySize)
{
return false;
}
buffer.Slice(buffer.Start, bodySize).CopyTo(destination);
return true;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment