Created
March 3, 2016 20:32
-
-
Save annymsMthd/9ffee9f2e7d99d04f9a8 to your computer and use it in GitHub Desktop.
SimpleClient
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class SimpleClient : AssociationHandle | |
{ | |
private readonly TcpClient _client; | |
private byte[] _lengthBuffer = new byte[4]; | |
private byte[] _buffer = new byte[650000]; | |
private byte[] _writeBuffer = new byte[650000]; | |
private NetworkStream _stream; | |
private IHandleEventListener _eventListener; | |
private BlockingCollection<ByteString> _outgoingMessages = new BlockingCollection<ByteString>(); | |
private BlockingCollection<byte[]> _buffers = new BlockingCollection<byte[]>(); | |
public SimpleClient( | |
TcpClient client, | |
Address localAddress, | |
Address remoteAddress) | |
: base(localAddress, remoteAddress) | |
{ | |
_client = client; | |
_client.NoDelay = true; | |
_stream = _client.GetStream(); | |
for (int i = 0; i < 100; i++) | |
{ | |
_buffers.Add(new byte[_client.SendBufferSize]); | |
} | |
Task.Run(async () => await QueueProcessor()); | |
} | |
private object _writeLock = new object(); | |
public override bool Write(ByteString payload) | |
{ | |
_outgoingMessages.Add(payload); | |
return true; | |
} | |
private async Task QueueProcessor() | |
{ | |
while (true) | |
{ | |
var payload = _outgoingMessages.Take(); | |
var buffer = _buffers.Take(); | |
var length = payload.Length; | |
var lengthBytes = BitConverter.GetBytes(length); | |
Buffer.BlockCopy(lengthBytes, 0, buffer, 0, 4); | |
payload.CopyTo(buffer, 4); | |
try | |
{ | |
_stream.BeginWrite(buffer, 0, 4 + payload.Length, ar => | |
{ | |
_stream.EndWrite(ar); | |
_buffers.Add(buffer); | |
}, null); | |
} | |
catch (ObjectDisposedException) | |
{ | |
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown)); | |
} | |
catch (IOException) | |
{ | |
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown)); | |
return; | |
} | |
} | |
} | |
public override void Disassociate() | |
{ | |
_client.Close(); | |
} | |
internal void SetEventListener(IHandleEventListener eventListener) | |
{ | |
_eventListener = eventListener; | |
ReadLength(0); | |
} | |
private void ReadLength(int alreadyRead) | |
{ | |
try | |
{ | |
_stream.BeginRead(_lengthBuffer, alreadyRead, 4 - alreadyRead, ar => | |
{ | |
int bytesRead = 0; | |
try | |
{ | |
bytesRead = _stream.EndRead(ar); | |
} | |
catch (IOException) | |
{ | |
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown)); | |
return; | |
} | |
catch (ObjectDisposedException) | |
{ | |
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown)); | |
return; | |
} | |
alreadyRead += bytesRead; | |
if (alreadyRead > 4) | |
{ | |
Console.WriteLine("!!!!!!!!!!!!!!!!!!!!!!!!!!!111"); | |
} | |
if (alreadyRead == 4) | |
{ | |
var length = BitConverter.ToInt32(_lengthBuffer, 0); | |
ReadPayload(0, length); | |
} | |
else | |
{ | |
ReadLength(alreadyRead); | |
} | |
}, null); | |
} | |
catch (ObjectDisposedException) | |
{ | |
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown)); | |
return; | |
} | |
catch (IOException) | |
{ | |
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown)); | |
return; | |
} | |
} | |
private void ReadPayload(int alreadyRead, int totalLength) | |
{ | |
try | |
{ | |
_stream.BeginRead(_buffer, alreadyRead, totalLength - alreadyRead, ar => | |
{ | |
int bytesRead = 0; | |
try | |
{ | |
bytesRead = _stream.EndRead(ar); | |
} | |
catch (IOException) | |
{ | |
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown)); | |
return; | |
} | |
catch (ObjectDisposedException) | |
{ | |
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown)); | |
return; | |
} | |
alreadyRead += bytesRead; | |
if (alreadyRead > totalLength) | |
{ | |
Console.WriteLine("!!!!!!!!!!!!!!!!@#@#!@##!"); | |
} | |
if (alreadyRead == totalLength) | |
{ | |
_eventListener.Notify(new InboundPayload(ByteString.CopyFrom(_buffer, 0, totalLength))); | |
ReadLength(0); | |
} | |
else | |
{ | |
ReadPayload(alreadyRead, totalLength); | |
} | |
}, null); | |
} | |
catch (ObjectDisposedException) | |
{ | |
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown)); | |
return; | |
} | |
catch (IOException) | |
{ | |
_eventListener.Notify(new Disassociated(DisassociateInfo.Shutdown)); | |
return; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment