Skip to content

Instantly share code, notes, and snippets.

@annymsMthd
Created March 3, 2016 20:32
Show Gist options
  • Save annymsMthd/9ffee9f2e7d99d04f9a8 to your computer and use it in GitHub Desktop.
Save annymsMthd/9ffee9f2e7d99d04f9a8 to your computer and use it in GitHub Desktop.
SimpleClient
public class SimpleClient : AssociationHandle
{
private readonly TcpClient _client;
private byte[] _lengthBuffer = new byte[4];
private byte[] _buffer = new byte[650000];
private byte[] _writeBuffer = new byte[650000];
private NetworkStream _stream;
private IHandleEventListener _eventListener;
private BlockingCollection<ByteString> _outgoingMessages = new BlockingCollection<ByteString>();
private BlockingCollection<byte[]> _buffers = new BlockingCollection<byte[]>();
public SimpleClient(
TcpClient client,
Address localAddress,
Address remoteAddress)
: base(localAddress, remoteAddress)
{
_client = client;
_client.NoDelay = true;
_stream = _client.GetStream();
for (int i = 0; i < 100; i++)
{
_buffers.Add(new byte[_client.SendBufferSize]);
}
Task.Run(async () => await QueueProcessor());
}
private object _writeLock = new object();
public override bool Write(ByteString payload)
{
_outgoingMessages.Add(payload);
return true;
}
private async Task QueueProcessor()
{
while (true)
{
var payload = _outgoingMessages.Take();
var buffer = _buffers.Take();
var length = payload.Length;
var lengthBytes = BitConverter.GetBytes(length);
Buffer.BlockCopy(lengthBytes, 0, buffer, 0, 4);
payload.CopyTo(buffer, 4);
try
{
_stream.BeginWrite(buffer, 0, 4 + payload.Length, ar =>
{
_stream.EndWrite(ar);
_buffers.Add(buffer);
}, null);
}
catch (ObjectDisposedException)
{
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown));
}
catch (IOException)
{
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown));
return;
}
}
}
public override void Disassociate()
{
_client.Close();
}
internal void SetEventListener(IHandleEventListener eventListener)
{
_eventListener = eventListener;
ReadLength(0);
}
private void ReadLength(int alreadyRead)
{
try
{
_stream.BeginRead(_lengthBuffer, alreadyRead, 4 - alreadyRead, ar =>
{
int bytesRead = 0;
try
{
bytesRead = _stream.EndRead(ar);
}
catch (IOException)
{
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown));
return;
}
catch (ObjectDisposedException)
{
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown));
return;
}
alreadyRead += bytesRead;
if (alreadyRead > 4)
{
Console.WriteLine("!!!!!!!!!!!!!!!!!!!!!!!!!!!111");
}
if (alreadyRead == 4)
{
var length = BitConverter.ToInt32(_lengthBuffer, 0);
ReadPayload(0, length);
}
else
{
ReadLength(alreadyRead);
}
}, null);
}
catch (ObjectDisposedException)
{
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown));
return;
}
catch (IOException)
{
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown));
return;
}
}
private void ReadPayload(int alreadyRead, int totalLength)
{
try
{
_stream.BeginRead(_buffer, alreadyRead, totalLength - alreadyRead, ar =>
{
int bytesRead = 0;
try
{
bytesRead = _stream.EndRead(ar);
}
catch (IOException)
{
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown));
return;
}
catch (ObjectDisposedException)
{
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown));
return;
}
alreadyRead += bytesRead;
if (alreadyRead > totalLength)
{
Console.WriteLine("!!!!!!!!!!!!!!!!@#@#!@##!");
}
if (alreadyRead == totalLength)
{
_eventListener.Notify(new InboundPayload(ByteString.CopyFrom(_buffer, 0, totalLength)));
ReadLength(0);
}
else
{
ReadPayload(alreadyRead, totalLength);
}
}, null);
}
catch (ObjectDisposedException)
{
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown));
return;
}
catch (IOException)
{
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown));
return;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment