Skip to content

Instantly share code, notes, and snippets.

@ahancock1
Last active May 1, 2020 20:41
Show Gist options
  • Save ahancock1/e6111e6bc2fd6177b8c1e8f2b45699a9 to your computer and use it in GitHub Desktop.
Save ahancock1/e6111e6bc2fd6177b8c1e8f2b45699a9 to your computer and use it in GitHub Desktop.
Simple ClientWebSocket implementation C#
public interface ISocket : IDisposable
{
event Action<Exception> OnError;
event Action<dynamic> OnData;
event Action OnDisconnected;
event Action OnConnected;
Task<bool> ConnectAsync(CancellationToken token = default);
Task CloseAsync();
void Send(string data);
}
public class Socket : Disposable, ISocket
{
private readonly ILogger _logger = Logger.Create<Socket>();
private readonly string _endpoint;
private readonly Channel<string> _messages = Channel.CreateUnbounded<string>(
new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false
});
private readonly SocketOptions _options;
public event Action<Exception> OnError;
public event Action<dynamic> OnData;
public event Action OnDisconnected;
public event Action OnConnected;
private ClientWebSocket _socket;
private readonly SemaphoreSlim _sync = new SemaphoreSlim(1, 1);
private CancellationTokenSource _source;
private CancellationTokenSource _linked;
public Socket(string endpoint, IOptions<SocketOptions> options)
{
_endpoint = endpoint;
_options = options.Value;
}
public bool IsConnected => _socket?.State == WebSocketState.Open;
public async Task<bool> ConnectAsync(CancellationToken token = default)
{
ThrowIfDisposed(nameof(Socket));
await _sync.WaitAsync(token);
try
{
if (IsConnected)
{
return IsConnected;
}
if (_socket?.State != WebSocketState.None)
{
_socket = new ClientWebSocket
{
Options =
{
KeepAliveInterval = TimeSpan.FromSeconds(20)
}
};
}
_source = new CancellationTokenSource();
_linked = CancellationTokenSource
.CreateLinkedTokenSource(_source.Token, token);
await _socket.ConnectAsync(new Uri(_endpoint), token);
ReceiveData(token);
SendData(_linked.Token);
if (IsConnected)
{
HandleConnected();
}
}
catch (OperationCanceledException)
{
/* Ignore */
}
catch (Exception e)
{
_logger.LogError(e, $"Error connecting to endpoint: {_endpoint}");
HandleError(e);
}
finally
{
_sync.Release();
}
return IsConnected;
}
private void TryReconnect(CancellationToken token)
{
if (!_options.AutoReconnect)
{
return;
}
try
{
Task.Run(async () =>
{
while (!token.IsCancellationRequested)
{
_logger.LogInformation(
$"Attempting to reconnect to endpoint: {_endpoint}");
if (await ConnectAsync(token))
{
break;
}
await Task.Delay(_options.AutoReconnectDelay, token);
}
}, token);
}
catch (OperationCanceledException)
{
/* Ignore */
}
}
private void HandleConnected()
{
_logger.LogInformation($"Socket connected to endpoint: {_endpoint}");
OnConnected?.Invoke();
}
public void Send(string data)
{
ThrowIfDisposed(nameof(Socket));
_messages.Writer.TryWrite(data);
}
public async Task CloseAsync()
{
ThrowIfDisposed(nameof(Socket));
_logger.LogInformation($"Socket closed to endpoint: {_endpoint}");
try
{
_source.Cancel();
_linked.Dispose();
_source.Dispose();
if (IsConnected)
{
await _socket.CloseAsync(
WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}
}
finally
{
_socket.Dispose();
}
OnDisconnected?.Invoke();
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
_source.Cancel();
_linked.Dispose();
_source.Dispose();
_socket?.Dispose();
_sync.Dispose();
}
}
private void HandleData(string message)
{
var delegates = OnData?.GetInvocationList();
if (delegates != null)
{
Task.Run(() =>
{
var data = JObject.Parse(message);
Parallel.ForEach(delegates, d =>
{
try
{
d.DynamicInvoke(data);
}
catch (Exception)
{
/* Ignore */
}
});
});
}
}
private void HandleError(Exception exception)
{
OnError?.Invoke(exception);
}
private void SendData(CancellationToken token)
{
Task.Run(async () =>
{
try
{
var reader = _messages.Reader;
while (await reader.WaitToReadAsync(token))
{
if (_socket.State != WebSocketState.Open)
{
break;
}
if (reader.TryRead(out var item))
{
var data = new ArraySegment<byte>(Encoding.UTF8.GetBytes(item));
await _socket
.SendAsync(data, WebSocketMessageType.Text, true, token);
}
}
}
catch (OperationCanceledException)
{
/* Ignore */
}
catch (Exception e)
{
_logger.LogError(e, "Error sending message.");
HandleError(e);
}
}, token);
}
private void ReceiveData(CancellationToken token)
{
Task.Run(async () =>
{
try
{
await using var stream = new MemoryStream();
var buffer = new ArraySegment<byte>(new byte[_options.ReceiveBufferSize]);
while (!token.IsCancellationRequested)
{
if (!IsConnected)
{
break;
}
stream.Seek(0, SeekOrigin.Begin);
var count = 0;
WebSocketReceiveResult result;
do
{
result = await _socket.ReceiveAsync(buffer, token);
if (result.MessageType == WebSocketMessageType.Close)
{
break;
}
if (buffer.Array != null)
{
stream.Write(buffer.Array, 0, result.Count);
count += result.Count;
}
} while (!result.EndOfMessage);
var message = Encoding.UTF8.GetString(stream.GetBuffer(), 0, count);
HandleData(message);
}
}
catch (OperationCanceledException)
{
/* Ignore */
}
catch (Exception e)
{
_logger.LogError(e, "Error receiving message.");
HandleError(e);
}
finally
{
await CloseAsync();
TryReconnect(token);
}
}, token);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment