Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
ITcpSocketClient implementation with disconnected detection
public class BetterTcpSocketClient : ITcpSocketClient
{
private TcpSocketClient _backingTcpSocketClient = new TcpSocketClient();
private readonly Stream _readBuffer = new BlockingMemoryStream();
private CancellationTokenSource _canceller;
public EventHandler<TcpSocketConnectedEventArgs> Connected { get; set; }
public EventHandler<TcpSocketDisconnectedEventArgs> Disconnected { get; set; }
public EventHandler<ErrorEventArgs> Errored { get; set; }
public Stream ReadStream
{
get { return _readBuffer; }
}
public Stream WriteStream
{
get { return _backingTcpSocketClient.WriteStream; }
}
public string RemoteAddress
{
get { return _backingTcpSocketClient.RemoteAddress; }
}
public int RemotePort
{
get { return _backingTcpSocketClient.RemotePort; }
}
public async Task ConnectAsync(string address, int port, bool secure = false)
{
await _backingTcpSocketClient.ConnectAsync(address, port, secure);
Task.Run(() => MonitorSocket())
.ContinueWith(rs =>
{
// because we can't await this, we pump exceptions out through an event handler
var err = new ErrorEventArgs(rs.Exception);
if (Errored != null)
Errored(this, err);
}, TaskContinuationOptions.OnlyOnFaulted);
var evt = new TcpSocketConnectedEventArgs(this);
if (Connected != null)
Connected(this, evt);
}
public async Task DisconnectAsync()
{
_canceller.Cancel();
await _backingTcpSocketClient.DisconnectAsync();
_backingTcpSocketClient = new TcpSocketClient();
var evt = new TcpSocketDisconnectedEventArgs(this);
if (Disconnected != null)
Disconnected(this, evt);
}
public void Dispose()
{
_backingTcpSocketClient.Dispose();
}
private async Task MonitorSocket()
{
var canceller = new CancellationTokenSource();
var token = canceller.Token;
_canceller = canceller;
var read = new byte[1];
while (!token.IsCancellationRequested)
{
var bytesRead = await _backingTcpSocketClient.ReadStream.ReadAsync(read, 0, 1, token);
if (bytesRead > 0)
_readBuffer.Write(read, 0, 1);
else
{
await DisconnectAsync();
_canceller.Cancel();
}
}
}
}
internal class BlockingMemoryStream : MemoryStream
{
private readonly ManualResetEvent _dataReady = new ManualResetEvent(false);
private byte[] _buffer;
private int _count;
private int _offset;
public override void Write(byte[] buffer, int offset, int count)
{
_buffer = buffer;
_offset = offset;
_count = count;
_dataReady.Set();
}
public override int Read(byte[] buffer, int offset, int count)
{
if (_buffer != null)
{
Array.Copy(_buffer, buffer, _count);
_buffer = null;
return _count;
}
// Block until the stream has some more data.
_dataReady.Reset();
_dataReady.WaitOne();
Array.Copy(_buffer, buffer, _count);
_buffer = null;
return _count;
}
}
public class TcpSocketConnectedEventArgs : EventArgs
{
private readonly ITcpSocketClient _socket;
public TcpSocketConnectedEventArgs(ITcpSocketClient socket)
{
_socket = socket;
}
public ITcpSocketClient Socket
{
get { return _socket; }
}
}
public class TcpSocketDisconnectedEventArgs : EventArgs
{
private readonly ITcpSocketClient _socket;
public TcpSocketDisconnectedEventArgs(ITcpSocketClient socket)
{
_socket = socket;
}
public ITcpSocketClient Socket
{
get { return _socket; }
}
}
public class ErrorEventArgs : EventArgs
{
private readonly Exception _exception;
public ErrorEventArgs(Exception exception)
{
_exception = exception;
}
public Exception Exception
{
get { return _exception; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment