Skip to content

Instantly share code, notes, and snippets.

@byt3bl33d3r
Created August 31, 2020 19:41
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save byt3bl33d3r/266cf541b34d73624fd19c085e3220d3 to your computer and use it in GitHub Desktop.
Save byt3bl33d3r/266cf541b34d73624fd19c085e3220d3 to your computer and use it in GitHub Desktop.
Async websocket C# client (producer/consumer pattern)
/*
Requires reference to System.Web.Extensions
*/
using System;
using System.Collections.Concurrent;
using System.Web.Script.Serialization;
using System.Text;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
namespace WebSocketsTest
{
class Program
{
public static BlockingCollection<string> recvQueue = new BlockingCollection<string>();
public static BlockingCollection<string> sendQueue = new BlockingCollection<string>();
public static Guid ClientId = Guid.NewGuid();
static async Task Main(string[] args)
{
Uri serverUrl = new Uri(new Uri(args[0]), ClientId.ToString());
CancellationToken token = new CancellationToken();
using (ClientWebSocket ws = new ClientWebSocket())
{
Console.WriteLine("Connecting to {0}", serverUrl.ToString());
await ws.ConnectAsync(serverUrl, token);
try
{
var sendTask = Task.Run(() => Send(ws, token));
var recvTask = Task.Run(() => Recv(ws, token));
do
{
string recvMsg = recvQueue.Take();
Console.WriteLine("Processed message: {0}", recvMsg);
var sendMsg = new Message
{
ClientId = ClientId.ToString(),
Payload = "Testing"
};
var json = new JavaScriptSerializer().Serialize(sendMsg);
sendQueue.Add(json);
} while (ws.State == WebSocketState.Open);
}
finally
{
Console.WriteLine("Closing connection...");
await ws.CloseAsync(WebSocketCloseStatus.Empty, "", token);
}
}
}
async static Task Recv(ClientWebSocket ws, CancellationToken token)
{
Console.WriteLine("Recv task started...");
var buffer = WebSocket.CreateClientBuffer(1024, 1024);
WebSocketReceiveResult taskResult;
while (ws.State == WebSocketState.Open)
{
string jsonResult = "";
do
{
taskResult = await ws.ReceiveAsync(buffer, token);
jsonResult += Encoding.UTF8.GetString(buffer.Array, 0, taskResult.Count);
} while (!taskResult.EndOfMessage);
if (!string.IsNullOrEmpty(jsonResult))
{
Console.WriteLine("Queueing {0}", jsonResult);
recvQueue.Add(jsonResult);
}
}
Console.WriteLine("Recv task exiting...");
}
async static Task Send(ClientWebSocket ws, CancellationToken token)
{
Console.WriteLine("Send task started...");
do
{
string sendMsg = sendQueue.Take();
Console.WriteLine("Sending {0}", sendMsg);
var sendMsgBytes = Encoding.UTF8.GetBytes(sendMsg);
ArraySegment<byte> segmentBuffer = new ArraySegment<byte>(sendMsgBytes, 0, sendMsgBytes.Length);
/*
while ((segmentBuffer = new ArraySegment<byte>(sendMsgBytes, segmentBuffer.Offset, 1024)).Count > 1024)
{
await ws.SendAsync(segmentBuffer, WebSocketMessageType.Binary, false, token);
}
*/
await ws.SendAsync(segmentBuffer, WebSocketMessageType.Binary, true, token);
} while (ws.State == WebSocketState.Open);
Console.WriteLine("Send task exiting...");
}
}
class Message
{
public string ClientId;
public string Payload;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment