Created
April 12, 2019 13:46
-
-
Save namse/b26f7317cf63e5d63027e20ddefd444f to your computer and use it in GitHub Desktop.
BaseNetworkConnector using System.IO.Pipelines
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Buffers; | |
using System.Collections.Concurrent; | |
using System.IO.Pipelines; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using MessageSerializer; | |
namespace NetworkConnector.Connector | |
{ | |
public abstract class BaseNetworkConnector | |
{ | |
private readonly BaseMessageSerializer _messageSerializer; | |
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); | |
private readonly AsyncConcurrentQueue<object> _messageQueue = new AsyncConcurrentQueue<object>(); | |
private static readonly ConcurrentDictionary<string, Type> TypeCaches = new ConcurrentDictionary<string, Type>(); | |
public bool IsRunning { get; private set; } | |
public abstract bool IsConnected { get; } | |
protected BaseNetworkConnector(BaseMessageSerializer messageSerializer) | |
{ | |
_messageSerializer = messageSerializer; | |
} | |
public void Start() | |
{ | |
CheckIsConnectedOrThrowException(); | |
var pipe = new Pipe(); | |
var writing = Task.Run(async () => | |
{ | |
try | |
{ | |
await FillPipeAsync(pipe.Writer, _cancellationTokenSource.Token); | |
if (!_cancellationTokenSource.IsCancellationRequested) | |
{ | |
_cancellationTokenSource.Cancel(); | |
} | |
} | |
catch (Exception exception) | |
{ | |
Console.WriteLine(exception); | |
} | |
}); | |
var reading = Task.Run(async () => | |
{ | |
try | |
{ | |
await ReadPipeAsync(pipe.Reader, _cancellationTokenSource.Token); | |
if (!_cancellationTokenSource.IsCancellationRequested) | |
{ | |
_cancellationTokenSource.Cancel(); | |
} | |
} | |
catch (Exception exception) | |
{ | |
Console.WriteLine(exception); | |
} | |
}); | |
IsRunning = true; | |
} | |
public void Stop() | |
{ | |
_cancellationTokenSource.Cancel(); | |
IsRunning = false; | |
} | |
protected void CheckIsConnectedOrThrowException() | |
{ | |
if (!IsConnected) | |
{ | |
throw new Exception("NetworkConnector isn't connected yet. Call Start() first"); | |
} | |
} | |
protected void CheckIsRunningOrThrowException() | |
{ | |
if (!IsRunning) | |
{ | |
throw new Exception("NetworkConnector isn't started yet. Call ConnectAsync() first"); | |
} | |
} | |
public abstract ValueTask ConnectAsync(CancellationToken cancellationToken); | |
public async Task SendMessageAsync<TMessage>(TMessage message, CancellationToken cancellationToken) | |
{ | |
CheckIsRunningOrThrowException(); | |
var bodyBuffer = _messageSerializer.Serialize(message); | |
var header = new Header | |
{ | |
BodySize = bodyBuffer.Length, | |
TypeName = typeof(TMessage).FullName, | |
}; | |
var headerBuffer = header.ToMemory(); | |
await SendPacketAsync(headerBuffer, cancellationToken); | |
await SendPacketAsync(bodyBuffer, cancellationToken); | |
} | |
public async Task<TMessage> ReceiveMessageAsync<TMessage>() | |
{ | |
var message = await _messageQueue.DequeueAsync(_cancellationTokenSource.Token); | |
return (TMessage)message; | |
} | |
protected abstract Task SendPacketAsync(ReadOnlyMemory<byte> packet, CancellationToken cancellationToken); | |
protected abstract Task<int> ReceivePacketAsync(Memory<byte> memory, CancellationToken cancellationToken); | |
private async Task FillPipeAsync(PipeWriter pipeWriter, CancellationToken cancellationToken) | |
{ | |
const int minimumBufferSize = 512; | |
while (true) | |
{ | |
// Allocate at least 512 bytes from the PipeWriter | |
var memory = pipeWriter.GetMemory(minimumBufferSize); | |
try | |
{ | |
var bytesReceived = await ReceivePacketAsync(memory, cancellationToken); | |
if (bytesReceived == 0) | |
{ | |
break; | |
} | |
// Tell the PipeWriter how much was read from the Socket | |
pipeWriter.Advance(bytesReceived); | |
} | |
catch(Exception exception) | |
{ | |
Console.WriteLine(exception); | |
break; | |
} | |
// Make the data available to the PipeReader | |
var flushResult = await pipeWriter.FlushAsync(cancellationToken); | |
if (flushResult.IsCompleted) | |
{ | |
break; | |
} | |
} | |
// Tell the PipeReader that there's no more data coming | |
pipeWriter.Complete(); | |
} | |
private async Task ReadPipeAsync(PipeReader pipeReader, CancellationToken cancellationToken) | |
{ | |
Header? header = null; | |
while (true) | |
{ | |
var result = await pipeReader.ReadAsync(cancellationToken); | |
var buffer = result.Buffer; | |
// How to read message? | |
// Requirement : We have to know where end of message in this stream. | |
// -> Put header when write message! | |
SequencePosition? position = null; | |
while (true) | |
{ | |
// Read Header | |
if (header is null) | |
{ | |
if (!Header.TryParseHeader(buffer, out header) || header is null) | |
{ | |
break; | |
} | |
position = position is null | |
? buffer.GetPosition(header.Value.HeaderSize) | |
: buffer.GetPosition(header.Value.HeaderSize, position.Value); | |
buffer = buffer.Slice(position.Value); | |
} | |
// Read Body | |
var bodyBufferSource = ArrayPool<byte>.Shared.Rent(header.Value.BodySize); | |
var bodyBufferMemory = bodyBufferSource.AsMemory(0, header.Value.BodySize); | |
var isCopied = TryCopyBodyBuffer( | |
buffer, | |
header.Value.BodySize, | |
bodyBufferMemory.Span); | |
if (!isCopied) | |
{ | |
ArrayPool<byte>.Shared.Return(bodyBufferSource); | |
break; | |
} | |
position = position is null | |
? buffer.GetPosition(header.Value.BodySize) | |
: buffer.GetPosition(header.Value.BodySize, position.Value); | |
buffer = buffer.Slice(position.Value); | |
var copyedHeader = header; | |
_ = Task.Run(() => | |
{ | |
var type = GetType(copyedHeader.Value.TypeName); | |
var message = _messageSerializer.Deserialize(bodyBufferMemory, type); | |
_messageQueue.Enqueue(message); | |
ArrayPool<byte>.Shared.Return(bodyBufferSource); | |
}, cancellationToken); | |
// Clear | |
header = null; | |
} | |
pipeReader.AdvanceTo(buffer.Start, buffer.End); | |
// Stop reading if there's no more data coming | |
if (result.IsCompleted) | |
{ | |
break; | |
} | |
} | |
// Mark the PipeReader as complete | |
pipeReader.Complete(); | |
} | |
private static Type? GetType(string typeName) | |
{ | |
if (TypeCaches.TryGetValue(typeName, out var cachedType)) | |
{ | |
return cachedType; | |
} | |
var type = Type.GetType(typeName); | |
if (!(type is null)) | |
{ | |
TypeCaches.TryAdd(typeName, type); | |
return type; | |
} | |
foreach (var assembly in AppDomain.CurrentDomain.GetAssemblies()) | |
{ | |
type = assembly.GetType(typeName); | |
if (!(type is null)) | |
{ | |
TypeCaches.TryAdd(typeName, type); | |
return type; | |
} | |
} | |
return null; | |
} | |
private bool TryCopyBodyBuffer(in ReadOnlySequence<byte> buffer, int bodySize, Span<byte> destination) | |
{ | |
if (buffer.Length < bodySize) | |
{ | |
return false; | |
} | |
buffer.Slice(buffer.Start, bodySize).CopyTo(destination); | |
return true; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment