Skip to content

Instantly share code, notes, and snippets.

@dasannikov
Created November 4, 2019 11:21
Show Gist options
  • Save dasannikov/cde3ad8ab7df1fe0055acd1d7fb54b28 to your computer and use it in GitHub Desktop.
Save dasannikov/cde3ad8ab7df1fe0055acd1d7fb54b28 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace NativeWebSocket {
public delegate void WebSocketOpenEventHandler();
public delegate void WebSocketMessageEventHandler(byte[] data);
public delegate void WebSocketMessageStreamEventHandler(MemoryStream stream);
public delegate void WebSocketErrorEventHandler(string errorMsg);
public delegate void WebSocketCloseEventHandler(WebSocketCloseCode closeCode);
public enum WebSocketCloseCode {
/* Do NOT use NotSet - it's only purpose is to indicate that the close code cannot be parsed. */
NotSet = 0,
Normal = 1000,
Away = 1001,
ProtocolError = 1002,
UnsupportedData = 1003,
Undefined = 1004,
NoStatus = 1005,
Abnormal = 1006,
InvalidData = 1007,
PolicyViolation = 1008,
TooBig = 1009,
MandatoryExtension = 1010,
ServerError = 1011,
TlsHandshakeFailure = 1015
}
public enum WebSocketState {
Connecting,
Open,
Closing,
Closed
}
public interface IWebSocket {
WebSocketState State { get; }
event WebSocketOpenEventHandler OnOpen;
event WebSocketMessageEventHandler OnMessage;
event WebSocketMessageStreamEventHandler OnMessageStream;
event WebSocketErrorEventHandler OnError;
event WebSocketCloseEventHandler OnClose;
}
public static class WebSocketHelpers {
public static WebSocketCloseCode ParseCloseCodeEnum(int closeCode) {
if(Enum.IsDefined(typeof(WebSocketCloseCode), closeCode)) {
return (WebSocketCloseCode) closeCode;
}
return WebSocketCloseCode.Undefined;
}
public static WebSocketException GetErrorMessageFromCode(int errorCode, Exception inner) {
switch(errorCode) {
case -1:
return new WebSocketUnexpectedException("WebSocket instance not found.", inner);
case -2:
return new WebSocketInvalidStateException("WebSocket is already connected or in connecting state.", inner);
case -3:
return new WebSocketInvalidStateException("WebSocket is not connected.", inner);
case -4:
return new WebSocketInvalidStateException("WebSocket is already closing.", inner);
case -5:
return new WebSocketInvalidStateException("WebSocket is already closed.", inner);
case -6:
return new WebSocketInvalidStateException("WebSocket is not in open state.", inner);
case -7:
return new WebSocketInvalidArgumentException("Cannot close WebSocket. An invalid code was specified or reason is too long.", inner);
default:
return new WebSocketUnexpectedException("Unknown error.", inner);
}
}
}
public class WebSocketException : Exception {
public WebSocketException() { }
public WebSocketException(string message) : base(message) { }
public WebSocketException(string message, Exception inner) : base(message, inner) { }
}
public class WebSocketUnexpectedException : WebSocketException {
public WebSocketUnexpectedException() { }
public WebSocketUnexpectedException(string message) : base(message) { }
public WebSocketUnexpectedException(string message, Exception inner) : base(message, inner) { }
}
public class WebSocketInvalidArgumentException : WebSocketException {
public WebSocketInvalidArgumentException() { }
public WebSocketInvalidArgumentException(string message) : base(message) { }
public WebSocketInvalidArgumentException(string message, Exception inner) : base(message, inner) { }
}
public class WebSocketInvalidStateException : WebSocketException {
public WebSocketInvalidStateException() { }
public WebSocketInvalidStateException(string message) : base(message) { }
public WebSocketInvalidStateException(string message, Exception inner) : base(message, inner) { }
}
public class WebSocket : IWebSocket {
readonly object Lock = new object();
bool isSending = false;
CancellationToken m_CancellationToken;
ClientWebSocket m_Socket = new ClientWebSocket();
CancellationTokenSource m_TokenSource;
List<ArraySegment<byte>> sendBytesQueue = new List<ArraySegment<byte>>();
List<ArraySegment<byte>> sendTextQueue = new List<ArraySegment<byte>>();
Uri uri;
public WebSocket(string url) {
uri = new Uri(url);
var protocol = uri.Scheme;
if(!protocol.Equals("ws") && !protocol.Equals("wss"))
throw new ArgumentException("Unsupported protocol: " + protocol);
}
public event WebSocketOpenEventHandler OnOpen;
public event WebSocketMessageEventHandler OnMessage;
public event WebSocketMessageStreamEventHandler OnMessageStream;
public event WebSocketErrorEventHandler OnError;
public event WebSocketCloseEventHandler OnClose;
public WebSocketState State {
get {
switch(m_Socket.State) {
case System.Net.WebSockets.WebSocketState.Connecting:
return WebSocketState.Connecting;
case System.Net.WebSockets.WebSocketState.Open:
return WebSocketState.Open;
case System.Net.WebSockets.WebSocketState.CloseSent:
case System.Net.WebSockets.WebSocketState.CloseReceived:
return WebSocketState.Closing;
case System.Net.WebSockets.WebSocketState.Closed:
return WebSocketState.Closed;
default:
return WebSocketState.Closed;
}
}
}
public void CancelConnection() {
m_TokenSource?.Cancel();
}
public async Task Connect() {
try {
m_TokenSource = new CancellationTokenSource();
m_CancellationToken = m_TokenSource.Token;
m_Socket = new ClientWebSocket();
await m_Socket.ConnectAsync(uri, m_CancellationToken);
OnOpen?.Invoke();
await Receive();
} catch(Exception ex) {
OnError?.Invoke(ex.Message);
OnClose?.Invoke(WebSocketCloseCode.Abnormal);
} finally {
if(m_Socket != null) {
m_TokenSource.Cancel();
m_Socket.Dispose();
}
}
}
public Task Send(byte[] bytes) {
return SendMessage(sendBytesQueue, WebSocketMessageType.Binary, new ArraySegment<byte>(bytes));
}
public Task SendText(string message) {
var encoded = Encoding.UTF8.GetBytes(message);
return SendMessage(sendTextQueue, WebSocketMessageType.Text, new ArraySegment<byte>(encoded, 0, encoded.Length));
}
async Task SendMessage(List<ArraySegment<byte>> queue, WebSocketMessageType messageType, ArraySegment<byte> buffer) {
// Return control to the calling method immediately.
await Task.Yield();
// Make sure we have data.
if(buffer.Count == 0) {
return;
}
// The state of the connection is contained in the context Items dictionary.
bool sending;
lock(Lock) {
sending = isSending;
// If not, we are now.
if(!isSending) {
isSending = true;
}
}
if(!sending) {
// Lock with a timeout, just in case.
if(!Monitor.TryEnter(m_Socket, 1000)) {
// If we couldn't obtain exclusive access to the socket in one second, something is wrong.
await m_Socket.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Empty, m_CancellationToken);
return;
}
try {
// Send the message synchronously.
var t = m_Socket.SendAsync(buffer, messageType, true, m_CancellationToken);
t.Wait(m_CancellationToken);
} finally {
Monitor.Exit(m_Socket);
}
// Note that we've finished sending.
lock(Lock) {
isSending = false;
}
// Handle any queued messages.
await HandleQueue(queue, messageType);
} else {
// Add the message to the queue.
lock(Lock) {
queue.Add(buffer);
}
}
}
async Task HandleQueue(List<ArraySegment<byte>> queue, WebSocketMessageType messageType) {
var buffer = new ArraySegment<byte>();
lock(Lock) {
// Check for an item in the queue.
if(queue.Count > 0) {
// Pull it off the top.
buffer = queue[0];
queue.RemoveAt(0);
}
}
// Send that message.
if(buffer.Count > 0) {
await SendMessage(queue, messageType, buffer);
}
}
public async Task Receive() {
var buffer = new ArraySegment<byte>(new byte[8192]);
var ms = new MemoryStream();
try {
while(m_Socket.State == System.Net.WebSockets.WebSocketState.Open) {
WebSocketReceiveResult result;
do {
result = await m_Socket.ReceiveAsync(buffer, m_CancellationToken);
// ReSharper disable once AssignNullToNotNullAttribute
ms.Write(buffer.Array, buffer.Offset, result.Count);
} while(!result.EndOfMessage);
ms.Seek(0, SeekOrigin.Begin);
if(result.MessageType == WebSocketMessageType.Text) {
OnMessage?.Invoke(ms.ToArray());
OnMessageStream?.Invoke(ms);
} else if(result.MessageType == WebSocketMessageType.Binary) {
OnMessage?.Invoke(ms.ToArray());
OnMessageStream?.Invoke(ms);
} else if(result.MessageType == WebSocketMessageType.Close) {
await Close();
// ReSharper disable once PossibleInvalidOperationException
OnClose?.Invoke(WebSocketHelpers.ParseCloseCodeEnum((int)result.CloseStatus));
break;
}
ms.SetLength(0);
}
} catch(Exception) {
m_TokenSource.Cancel();
OnClose?.Invoke(WebSocketCloseCode.Abnormal);
}
}
public async Task Close() {
if(State == WebSocketState.Open) {
await m_Socket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, m_CancellationToken);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment