Skip to content

Instantly share code, notes, and snippets.

@Zirientis
Last active February 24, 2021 14:46
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Zirientis/16c60215c13880ada43014a533bf8e9e to your computer and use it in GitHub Desktop.
Save Zirientis/16c60215c13880ada43014a533bf8e9e to your computer and use it in GitHub Desktop.
var ENDPOINT_HTTP = "http://localhost:3000/auth"
var ENDPOINT_WS = "ws://localhost:3000/ws_endpoint/"
var DEBUG = {error: function(msg){console.error("[" + (WSCORE.token ? WSCORE.token : "-") + "] " + msg)}, warn: function(msg){console.warn("[" + (WSCORE.token ? WSCORE.token : "-") + "] " + msg)}, log: function(msg){console.log("[" + (WSCORE.token ? WSCORE.token : "-") + "] " + msg)}, info: console.info, debug: console.debug};
var WSCORE = Object.create(null);
// first, contact the app server for WebSocket connection information.
// the app server uses other means of authentication to authenticate the client.
// the app server then sends a JSON payload to the client indicating how to
// setup WebSocket connections.
// initial contact
var setupXHR = new XMLHttpRequest();
setupXHR.onreadystatechange = function() {
DEBUG.log("readyState changed to " + setupXHR.readyState);
if (setupXHR.readyState === XMLHttpRequest.DONE && setupXHR.status == 200) {
DEBUG.log("bootstrapping...");
bootstrap();
}
};
setupXHR.open("GET", ENDPOINT_HTTP, true);
setupXHR.responseType = "json";
setupXHR.send();
DEBUG.log("Bootstrap request sent!");
function bootstrap() {
var resp = setupXHR.response;
WSCORE.token = resp.token;
WSCORE.sock = new Array(resp.maxConnections <= 20 ? resp.maxConnections : 20);
WSCORE.perf = Object.create(null);
WSCORE.perf.aggregate = Object.create(null);
WSCORE.perf.aggregate.token = resp.token;
WSCORE.perf.aggregate.all_channels_blocked = 0;
WSCORE.perf.persock = new Array(WSCORE.sock.length);
for (var i = 0; i < WSCORE.sock.length; i++) {
var endpoint = ENDPOINT_WS + WSCORE.token + "?conn=" + i;
WSCORE.sock[i] = new WebSocket(endpoint);
WSCORE.sock[i].onmessage = makeMessageHandler(WSCORE.sock[i], i);
WSCORE.sock[i].ready = true;
DEBUG.log("Created WebSocket connection to " + endpoint);
WSCORE.perf.persock[i] = Object.create(null);
WSCORE.perf.persock[i].messages_sent = 0;
WSCORE.perf.persock[i].messages_received = 0;
WSCORE.perf.persock[i].ack_sent = 0;
WSCORE.perf.persock[i].ack_received = 0;
WSCORE.perf.persock[i].channel_blocked_when_needed = 0;
}
WSCORE.sendMessage = function(msg) {
var success = false;
for (var i = 0; i < WSCORE.sock.length; i++) {
if (WSCORE.sock[i].ready) {
//DEBUG.log("Sending message on channel " + i); // XXX: This doesn't appear to be accurate sometimes! ACKs "appear" to go to another channel. Wireshark captures show everything is working as intended, however.
WSCORE.sock[i].send(msg); // consider sending arraybuffer
WSCORE.sock[i].ready = false;
WSCORE.perf.persock[i].messages_sent += 1;
success = true;
break;
}
else {
WSCORE.perf.persock[i].channel_blocked_when_needed += 1;
}
}
if (!success) {
DEBUG.warn("No WebSockets had zero latency!");
WSCORE.perf.aggregate.all_channels_blocked += 1;
WSCORE.sock[0].send(msg); WSCORE.sock[0].ready = false; // send it anyway?
}
};
WSCORE.handleMessage = function(chan, chanNum, e) {
if (e.data == "*") {
WSCORE.sock[chanNum].ready = true;
WSCORE.perf.persock[chanNum].ack_received += 1;
}
else {
//DEBUG.log("message recieved on channel " + chanNum);
WSCORE.perf.persock[chanNum].messages_received += 1;
sendAck(chan, chanNum); // ack to server
// handle message here
postMessage({type: "data", data: e.data});
}
};
setTimeout(function() {
DEBUG.log("Starting to transmit");
WSCORE.updateTimer = setInterval(function() {
WSCORE.sendMessage("ABCDEFGHIJKLMNOPQRSTUVWXYZ");
}, 50)
}, 5000);
/*WSCORE.perf.statUpdateTimer = setInterval(100, function() {
var cs_stat = document.getElementById("cs_stat");
cs_stat.innerHTML = "";
Object.getOwnPropertyNames(WSCORE.perf.aggregate).forEach(
function(val, idx, arr) {
cs_stat.innerHTML += val;
cs_stat.innerHTML += WSCORE.perf.aggregate[val];
}
);
});*/
}
function makeMessageHandler(sock, chanNum) {
return function(e) {
WSCORE.handleMessage(sock, chanNum, e);
};
}
function sendAck(chan, chanNum) {
chan.send("*");
WSCORE.perf.persock[chanNum].ack_sent += 1;
}
// message format:
// recieves object, {"type": "<TYPE>", "msg": "<MSG>"}
onmessage = function(e) {
switch (e.data.type) {
case "send":
sendMessage(e.data.msg);
break;
case "telemetry":
postMessage({type: "telemetry", data: WSCORE.perf});
DEBUG.debug(WSCORE.perf);
break;
case "close":
for (var i = 0; i < WSCORE.sock.length; i++) {
WSCORE.sock[i].close();
}
postMessage({type: "telemetry", data: WSCORE.perf});
clearInterval(WSCORE.updateTimer);
close();
break;
default:
DEBUG.warn("Unrecognized message type: " + e.data.type);
}
}
<!doctype html>
<html>
<!-- THIS IS NOT PRODUCTION-READY CODE -->
<head>
<script>
var ENDPOINT_MANAGE = "http://localhost:3000/manage/"
var DEBUG = {error: console.error, warn: console.warn, log: console.log, info: console.info, debug: console.debug};
var clients;
document.addEventListener("DOMContentLoaded", function() {
clients = new Array(1/*document.getElementById("numClients").value*/);
for (var i = 0; i < clients.length; i++) {
clients[i] = new Worker("clientjs.js");
clients[i].onmessage = function(e) {
switch (e.data.type) {
case "data":
//handleAppMessage(e.data.data);
break;
case "telemetry":
updateTelemetryValues(i, e.data.data);
break;
default:
DEBUG.warn("unknown message type from worker: " + e.data.type);
break;
}
};
setInterval(updateStats, 1000);
}
document.getElementById("setconfigbtn").disabled = false;
document.getElementById("numclientsbtn").disabled = false;
});
function setNumClients() {
var numClients = document.getElementById("numClients").value;
if (numClients > clients.length) { // add more clients
var numClientsToAdd = numClients - clients.length;
for (var i = 0; i < numClientsToAdd; i++) {
clients[clients.length] = new Worker("clientjs.js");
}
}
else if (numClients < clients.length) { // remove some clients
var numClientsToKill = clients.length - numClients;
for (var i = 0; i < numClientsToKill; i++) {
clients[clients.length - 1 - i].postMessage({type: "close"});
}
clients.splice(-1 * numClientsToKill, numClientsToKill);
}
}
function updateStats() {
for (var i = 0; i < clients.length; i++) {
clients[i].postMessage({"type": "telemetry"});
}
}
function updateTelemetryValues(clientNum, data) {
var elem = document.getElementById("client" + clientNum);
var text = "Client " + clientNum + " " + JSON.stringify(data);
elem.innerHTML = text;
}
</script>
<script>
function setConfig() {
var stick = document.getElementById("stick");
var ctick = document.getElementById("ctick");
//set client parametres
//set server parameters
XHR = new XMLHttpRequest();
XHR.open("GET", ENDPOINT_MANAGE + "stick/" + stick.value, true);
XHR.send();
}
</script>
</head>
<body>
<!-- <button onclick="btnclk()">Send event to server</button> -->
<br />
<div style="float:left">
<span id="cs_stat">
<p id="client0"></p>
<p id="client1"></p>
<p id="client2"></p>
<p id="client3"></p>
<p id="client4"></p>
</span>
</div>
<div style="float:right">
<!--<p>Configuration</p>
Number of active sessions <input type="number" id="numClients" min="0" value="5"><button id="numclientsbtn" onclick="setNumClients()" disabled="true">Set number of clients</button><br />
Server &rarr; Client tickrate (Hz) <input type="number" id="stick" min="1" value="20"><br />
Client &rarr; Server tickrate (Hz) <input type="number" id="ctick" min="1" value="20"><br />
<button id="setconfigbtn" onclick="setConfig()" disabled="true">Set tickrate</button><br />
<p>Telemetry Recording</p>
</div>-->
</body>
</html>
// THIS IS NOT PRODUCTION-READY CODE!
#define UWS_THREADSAFE
#include <uWS/uWS.h>
#include <iostream>
#include <fstream>
#include <sstream>
#include <string>
#include <random>
#include <unordered_map>
#include <vector>
#include <stdint.h>
#include <chrono>
#include <thread>
#include <mutex>
#include <shared_mutex>
static const std::string ENDPOINT_HTTP("/auth");
static const std::string ENDPOINT_CLIENT_JS("/clientjs.js");
static const std::string ENDPOINT_WS("/ws_endpoint/");
static const std::string ENDPOINT_MANAGE("/manage/");
static const std::string INDEX_URL("/");
static const std::string EMPTY_RESPONSE("<html></html>");
static const std::string LORUM_IPSUM("0123456789Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.");
static const std::string COMMAND_SET_STICK("stick/");
static std::stringstream indexHtml;
static std::stringstream clientJs;
static int ENABLE_NODELAY = 1;
static double CONFIG_TICKRATE = 20;
// use a std::vector to group connections from a single client.
// hopefully this doesn't kill performance
std::unordered_map< long, std::vector<uWS::WebSocket<uWS::SERVER>*> > clients;
std::shared_timed_mutex clients_write_mutex;
struct sock_userdata_s {
long owningToken;
size_t sockNum;
bool isBusy;
};
void httpres_send_empty_end(uWS::HttpResponse *res) {
res->end(EMPTY_RESPONSE.data(), EMPTY_RESPONSE.length());
}
bool get_sock_busy(uWS::WebSocket<uWS::SERVER>* sock) {
sock_userdata_s* ud = static_cast<sock_userdata_s*>(sock->getUserData());
return ud->isBusy;
}
void set_sock_busy(uWS::WebSocket<uWS::SERVER>* sock) {
sock_userdata_s* ud = static_cast<sock_userdata_s*>(sock->getUserData());
ud->isBusy = true;
}
void clear_sock_busy(uWS::WebSocket<uWS::SERVER>* sock) {
sock_userdata_s* ud = static_cast<sock_userdata_s*>(sock->getUserData());
ud->isBusy = false;
}
bool app_send_message(long token, const char *message, size_t length, uWS::OpCode opCode, std::chrono::duration<long, std::milli> maxWaitMillis) {
if (!clients_write_mutex.try_lock_for(maxWaitMillis)) { // was shared lock, but library appears to be broken
std::cout << "Unable to get lock! (maybe timed out?)" << std::endl;
return false;
}
// XXX: If this throws something other than std::out_of_range (which it shouldn't) we might have locking problems!
// TODO: use a lock instead of the raw mutex
try {
std::vector<uWS::WebSocket<uWS::SERVER>*> connections = clients.at(token);
for (size_t i = 0; i < connections.size(); i++) {
uWS::WebSocket<uWS::SERVER>* ws = connections.at(i);
if (!get_sock_busy(ws) || i == connections.size() - 1) {
if (get_sock_busy(ws)) { // implied no channels ready but we need to send anyway
std::cerr << "No channels ready!" << std::endl;
}
//std::cout << "Sending on channel " << i << std::endl;
// WebSocket abstracts away the stream nature of TCP and allows us to send messages
ws->send(message, length, opCode);
set_sock_busy(ws);
//std::cout << "Set conn state to busy, right? isBusy is " << isBusy << std::endl;
break;
}
}
} catch (std::out_of_range e) {
std::cerr << "Tried to send message to invalid client!" << std::endl;
clients_write_mutex.unlock();
return false;
}
clients_write_mutex.unlock(); // was shared, but library appears to be broken
return true;
}
// one thread is spawned for each client that gets "authenticated")
void data_feed_per_client(long token) {
/*
std::cout << "Started a thread to serve session " << token << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
// now, send messages to the client.
std::cout << "Now sending data to session " << token << std::endl;
while (app_send_message(token, LORUM_IPSUM.data(), LORUM_IPSUM.length(), uWS::OpCode::TEXT, std::chrono::milliseconds((int)(1000 / CONFIG_TICKRATE)))) {
std::this_thread::sleep_for(std::chrono::milliseconds((int)(1000 / CONFIG_TICKRATE)));
}
std::cout << "Data feeder for session " << token << " exiting: returned false (session no longer exists?)" << std::endl;
*/
}
int main()
{
indexHtml << std::ifstream ("index.html").rdbuf();
if (!indexHtml.str().length()) {
std::cerr << "Failed to load index.html" << std::endl;
return -1;
}
clientJs << std::ifstream ("clientjs.js").rdbuf();
if (!clientJs.str().length()) {
std::cerr << "Failed to load clientjs.js" << std::endl;
return -1;
}
uWS::Hub h;
std::mt19937 rng(std::random_device{}()); // !!! THIS IS NOT GOOD ENOUGH !!! DON'T USE THIS IN PRODUCTION!
h.onHttpRequest([&rng](uWS::HttpResponse *res, uWS::HttpRequest req, char *data, size_t, size_t) {
uWS::Header urlHeader = req.getUrl();
std::string url = urlHeader.toString();
std::cout << "HTTP Request at URL " << url << " from " << res->httpSocket->getAddress().address << ":" << res->httpSocket->getAddress().port << std::endl;
if (url == INDEX_URL) {
res->end(indexHtml.str().data(), indexHtml.str().length());
}
else if (url == ENDPOINT_CLIENT_JS) {
res->end(clientJs.str().data(), clientJs.str().length());
}
else if (url == ENDPOINT_HTTP) {
long token = rng();
clients.insert({token, std::vector<uWS::WebSocket<uWS::SERVER>*>()});
std::string resp = std::string("{\"token\" : ") + std::to_string(token) + ", \"maxConnections\" : 10}";
res->end(resp.data(), resp.length());
std::cout << "Authorized session " << token << std::endl;
//std::thread t(data_feed_per_client, token);
//t.detach();
}
else if (url.compare(0, ENDPOINT_MANAGE.length(), ENDPOINT_MANAGE) == 0) {
try {
std::string commandStr = url.substr(ENDPOINT_MANAGE.length(), url.length() - ENDPOINT_MANAGE.length());
std::string valueStr;
if (commandStr.compare(0, COMMAND_SET_STICK.length(), COMMAND_SET_STICK) == 0) {
valueStr = commandStr.substr(COMMAND_SET_STICK.length(), commandStr.length() - COMMAND_SET_STICK.length());
CONFIG_TICKRATE = std::stol(valueStr);
std::cout << "Set server-to-client tick rate to " << CONFIG_TICKRATE << " ticks/second" << std::endl;
}
else {
std::cerr << "Recieved an unknown control message!" << std::endl;
}
} catch (const std::out_of_range e) {
std::cerr << "Received an invalid control message!" << std::endl;
}
httpres_send_empty_end(res);
}
else {
httpres_send_empty_end(res);
}
});
h.onConnection([&h](uWS::WebSocket<uWS::SERVER> *ws, uWS::HttpRequest req) {
// in a real app, we would ensure the user is authorized.
// we should set userdata on the socket so onMessage can clear the busy flag
ws->setNoDelay(ENABLE_NODELAY);
std::cout << "Got a new connection from " << ws->getAddress().address << ":" << ws->getAddress().port << std::endl;
uWS::Header urlHeader = req.getUrl();
std::string url = urlHeader.toString();
if (url.compare(0, ENDPOINT_WS.length(), ENDPOINT_WS) != 0) {
std::cerr << "Someone attempted to connect on a nonexistent endpoint!" << std::endl;
ws->close();
}
try {
// need to get client's token here
std::string tokenStr = url.substr(ENDPOINT_WS.length(), url.length() - ENDPOINT_WS.length());
long token = std::stol(tokenStr);
std::vector<uWS::WebSocket<uWS::SERVER>*>& connectionsFromClient = clients.at(token);
sock_userdata_s* ud = new sock_userdata_s({token, connectionsFromClient.size(), false});
ws->setUserData(ud);
connectionsFromClient.insert(connectionsFromClient.end(), ws);
std::cout << "Session " << token << " now connected with " << connectionsFromClient.size() << " active connections" << std::endl;
} catch (const std::out_of_range e) {
std::cerr << "Someone attempted to connect with an unrecognized token!" << std::endl;
sock_userdata_s* ud = new sock_userdata_s({0, 0, false}); // need to set so that we don't crash when removing the connection
ws->setUserData(ud);
ws->close();
}
});
// we should remove connections from the vector if/when they are dropped.
h.onDisconnection([&h](uWS::WebSocket<uWS::SERVER> *ws, int code, char *message, size_t length) {
// this needs to lock so the socket is removed from the vector before it is freed (when this returns)
try {
clients_write_mutex.lock();
} catch (std::exception e) {
std::cerr << "Failed to get mutex!?" << std::endl;
return;
}
//std::cout << "Got mutex for exclusive access" << std::endl;
sock_userdata_s* ud = static_cast<sock_userdata_s*>(ws->getUserData());
long token = ud->owningToken;
size_t sockNum = ud->sockNum;
// We MUST renumber the sockets or we will go out-of-bounds on the vector!
try {
std::vector<uWS::WebSocket<uWS::SERVER>*>& connections = clients.at(token);
auto iter = connections.begin() + sockNum;
if (iter >= connections.end()) {
std::cerr << "Tried to remove a socket beyond the end of the vector!" << std::endl;
}
connections.erase(iter); // iter is invalidated
std::cout << "Removed disconnected socket from session " << token << " (" << connections.size() << " remain)" << std::endl;
// renumber remaining sockets
for (size_t i = 0; i < connections.size(); i++) {
static_cast<sock_userdata_s*>(connections.at(i)->getUserData())->sockNum = i;
}
if (connections.size() == 0) {
clients.erase(clients.find(token));
std::cout << "Closed session " << token << std::endl;
}
} catch (std::out_of_range e) {
std::cerr << "Disconnect from a nonexistent socket?!" << std::endl;
}
delete ud;
//std::cout << "Releasing mutex from exlusive mode" << std::endl;
clients_write_mutex.unlock();
});
h.onMessage([&h](uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length, uWS::OpCode opCode) {
// if this is an acknowlegement, flag channel is ready.
if (length == 1 && *message == '*') {
// reset busy flag
clear_sock_busy(ws);
}
//std::cout << "Got a message!" << std::endl;
else if (opCode == uWS::OpCode::TEXT || opCode == uWS::OpCode::BINARY) {
// you would send the message off for processing here, we just ack
ws->send("*", 1, uWS::OpCode::TEXT);
}
});
std::srand(std::time(0));
std::cout << "OK, ready to start!" << std::endl;
std::cout << "Server is set up! Use the management endpoint to control it!" << std::endl;
h.listen(3000);
h.run();
// now, this thread goes to sleep
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
}
}
sudo tc qdisc replace dev lo root netem delay 100ms 30ms drop 10%
g++ server.cpp -g -std=c++14 -luWS -lssl -lz -pthread -Wall -Wextra -fsanitize=address -O0 -o server
sudo sh -c 'echo 1 > /proc/sys/net/ipv4/tcp_thin_linear_timeouts'
sudo sh -c 'echo 1 > /proc/sys/net/ipv4/tcp_thin_dupack'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment