Skip to content

Instantly share code, notes, and snippets.

@xamlmonkey
Last active November 20, 2021 20:33
Show Gist options
  • Star 42 You must be signed in to star a gist
  • Fork 13 You must be signed in to fork a gist
  • Save xamlmonkey/4737291 to your computer and use it in GitHub Desktop.
Save xamlmonkey/4737291 to your computer and use it in GitHub Desktop.
Wrapper for the .NET 4.5 ClientWebSocket
using System;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Coe.WebSocketWrapper
{
public class WebSocketWrapper
{
private const int ReceiveChunkSize = 1024;
private const int SendChunkSize = 1024;
private readonly ClientWebSocket _ws;
private readonly Uri _uri;
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private readonly CancellationToken _cancellationToken;
private Action<WebSocketWrapper> _onConnected;
private Action<string, WebSocketWrapper> _onMessage;
private Action<WebSocketWrapper> _onDisconnected;
protected WebSocketWrapper(string uri)
{
_ws = new ClientWebSocket();
_ws.Options.KeepAliveInterval = TimeSpan.FromSeconds(20);
_uri = new Uri(uri);
_cancellationToken = _cancellationTokenSource.Token;
}
/// <summary>
/// Creates a new instance.
/// </summary>
/// <param name="uri">The URI of the WebSocket server.</param>
/// <returns></returns>
public static WebSocketWrapper Create(string uri)
{
return new WebSocketWrapper(uri);
}
/// <summary>
/// Connects to the WebSocket server.
/// </summary>
/// <returns></returns>
public WebSocketWrapper Connect()
{
ConnectAsync();
return this;
}
/// <summary>
/// Set the Action to call when the connection has been established.
/// </summary>
/// <param name="onConnect">The Action to call.</param>
/// <returns></returns>
public WebSocketWrapper OnConnect(Action<WebSocketWrapper> onConnect)
{
_onConnected = onConnect;
return this;
}
/// <summary>
/// Set the Action to call when the connection has been terminated.
/// </summary>
/// <param name="onDisconnect">The Action to call</param>
/// <returns></returns>
public WebSocketWrapper OnDisconnect(Action<WebSocketWrapper> onDisconnect)
{
_onDisconnected = onDisconnect;
return this;
}
/// <summary>
/// Set the Action to call when a messages has been received.
/// </summary>
/// <param name="onMessage">The Action to call.</param>
/// <returns></returns>
public WebSocketWrapper OnMessage(Action<string, WebSocketWrapper> onMessage)
{
_onMessage = onMessage;
return this;
}
/// <summary>
/// Send a message to the WebSocket server.
/// </summary>
/// <param name="message">The message to send</param>
public void SendMessage(string message)
{
SendMessageAsync(message);
}
private async void SendMessageAsync(string message)
{
if (_ws.State != WebSocketState.Open)
{
throw new Exception("Connection is not open.");
}
var messageBuffer = Encoding.UTF8.GetBytes(message);
var messagesCount = (int)Math.Ceiling((double)messageBuffer.Length / SendChunkSize);
for (var i = 0; i < messagesCount; i++)
{
var offset = (SendChunkSize * i);
var count = SendChunkSize;
var lastMessage = ((i + 1) == messagesCount);
if ((count * (i + 1)) > messageBuffer.Length)
{
count = messageBuffer.Length - offset;
}
await _ws.SendAsync(new ArraySegment<byte>(messageBuffer, offset, count), WebSocketMessageType.Text, lastMessage, _cancellationToken);
}
}
private async void ConnectAsync()
{
await _ws.ConnectAsync(_uri, _cancellationToken);
CallOnConnected();
StartListen();
}
private async void StartListen()
{
var buffer = new byte[ReceiveChunkSize];
try
{
while (_ws.State == WebSocketState.Open)
{
var stringResult = new StringBuilder();
WebSocketReceiveResult result;
do
{
result = await _ws.ReceiveAsync(new ArraySegment<byte>(buffer), _cancellationToken);
if (result.MessageType == WebSocketMessageType.Close)
{
await
_ws.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
CallOnDisconnected();
}
else
{
var str = Encoding.UTF8.GetString(buffer, 0, result.Count);
stringResult.Append(str);
}
} while (!result.EndOfMessage);
CallOnMessage(stringResult);
}
}
catch (Exception)
{
CallOnDisconnected();
}
finally
{
_ws.Dispose();
}
}
private void CallOnMessage(StringBuilder stringResult)
{
if (_onMessage != null)
RunInTask(() => _onMessage(stringResult.ToString(), this));
}
private void CallOnDisconnected()
{
if (_onDisconnected != null)
RunInTask(() => _onDisconnected(this));
}
private void CallOnConnected()
{
if (_onConnected != null)
RunInTask(() => _onConnected(this));
}
private static void RunInTask(Action action)
{
Task.Factory.StartNew(action);
}
}
}
@arisawali2014
Copy link

gambar
Hello, i still have struggle when assign on message handler into new function/sub

@bn77144
Copy link

bn77144 commented Oct 12, 2020

This code works fine with ws or wss. However, when sending fastly data, an exception is received. System.InvalidOperationException: 'There is already one outstanding 'SendAsync' call for this WebSocket instance. ReceiveAsync and SendAsync can be called simultaneously, but at most one outstanding operation for each of them is allowed at the same time

I think we must wait end of task on these 2 methods.... await dont wait end of task.....

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment