|
// THIS IS NOT PRODUCTION-READY CODE! |
|
#define UWS_THREADSAFE |
|
#include <uWS/uWS.h> |
|
#include <iostream> |
|
#include <fstream> |
|
#include <sstream> |
|
#include <string> |
|
#include <random> |
|
#include <unordered_map> |
|
#include <vector> |
|
#include <stdint.h> |
|
#include <chrono> |
|
#include <thread> |
|
#include <mutex> |
|
#include <shared_mutex> |
|
|
|
|
|
static const std::string ENDPOINT_HTTP("/auth"); |
|
static const std::string ENDPOINT_CLIENT_JS("/clientjs.js"); |
|
static const std::string ENDPOINT_WS("/ws_endpoint/"); |
|
static const std::string ENDPOINT_MANAGE("/manage/"); |
|
static const std::string INDEX_URL("/"); |
|
static const std::string EMPTY_RESPONSE("<html></html>"); |
|
static const std::string LORUM_IPSUM("0123456789Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."); |
|
|
|
static const std::string COMMAND_SET_STICK("stick/"); |
|
|
|
static std::stringstream indexHtml; |
|
static std::stringstream clientJs; |
|
static int ENABLE_NODELAY = 1; |
|
|
|
static double CONFIG_TICKRATE = 20; |
|
|
|
// use a std::vector to group connections from a single client. |
|
// hopefully this doesn't kill performance |
|
std::unordered_map< long, std::vector<uWS::WebSocket<uWS::SERVER>*> > clients; |
|
std::shared_timed_mutex clients_write_mutex; |
|
|
|
struct sock_userdata_s { |
|
long owningToken; |
|
size_t sockNum; |
|
bool isBusy; |
|
}; |
|
|
|
void httpres_send_empty_end(uWS::HttpResponse *res) { |
|
res->end(EMPTY_RESPONSE.data(), EMPTY_RESPONSE.length()); |
|
} |
|
|
|
bool get_sock_busy(uWS::WebSocket<uWS::SERVER>* sock) { |
|
sock_userdata_s* ud = static_cast<sock_userdata_s*>(sock->getUserData()); |
|
return ud->isBusy; |
|
} |
|
|
|
void set_sock_busy(uWS::WebSocket<uWS::SERVER>* sock) { |
|
sock_userdata_s* ud = static_cast<sock_userdata_s*>(sock->getUserData()); |
|
ud->isBusy = true; |
|
} |
|
|
|
void clear_sock_busy(uWS::WebSocket<uWS::SERVER>* sock) { |
|
sock_userdata_s* ud = static_cast<sock_userdata_s*>(sock->getUserData()); |
|
ud->isBusy = false; |
|
} |
|
|
|
bool app_send_message(long token, const char *message, size_t length, uWS::OpCode opCode, std::chrono::duration<long, std::milli> maxWaitMillis) { |
|
if (!clients_write_mutex.try_lock_for(maxWaitMillis)) { // was shared lock, but library appears to be broken |
|
std::cout << "Unable to get lock! (maybe timed out?)" << std::endl; |
|
return false; |
|
} |
|
// XXX: If this throws something other than std::out_of_range (which it shouldn't) we might have locking problems! |
|
// TODO: use a lock instead of the raw mutex |
|
try { |
|
std::vector<uWS::WebSocket<uWS::SERVER>*> connections = clients.at(token); |
|
for (size_t i = 0; i < connections.size(); i++) { |
|
uWS::WebSocket<uWS::SERVER>* ws = connections.at(i); |
|
if (!get_sock_busy(ws) || i == connections.size() - 1) { |
|
if (get_sock_busy(ws)) { // implied no channels ready but we need to send anyway |
|
std::cerr << "No channels ready!" << std::endl; |
|
} |
|
//std::cout << "Sending on channel " << i << std::endl; |
|
// WebSocket abstracts away the stream nature of TCP and allows us to send messages |
|
ws->send(message, length, opCode); |
|
set_sock_busy(ws); |
|
//std::cout << "Set conn state to busy, right? isBusy is " << isBusy << std::endl; |
|
break; |
|
} |
|
} |
|
} catch (std::out_of_range e) { |
|
std::cerr << "Tried to send message to invalid client!" << std::endl; |
|
clients_write_mutex.unlock(); |
|
return false; |
|
} |
|
clients_write_mutex.unlock(); // was shared, but library appears to be broken |
|
return true; |
|
} |
|
|
|
// one thread is spawned for each client that gets "authenticated") |
|
void data_feed_per_client(long token) { |
|
/* |
|
std::cout << "Started a thread to serve session " << token << std::endl; |
|
std::this_thread::sleep_for(std::chrono::milliseconds(3000)); |
|
// now, send messages to the client. |
|
std::cout << "Now sending data to session " << token << std::endl; |
|
while (app_send_message(token, LORUM_IPSUM.data(), LORUM_IPSUM.length(), uWS::OpCode::TEXT, std::chrono::milliseconds((int)(1000 / CONFIG_TICKRATE)))) { |
|
std::this_thread::sleep_for(std::chrono::milliseconds((int)(1000 / CONFIG_TICKRATE))); |
|
} |
|
std::cout << "Data feeder for session " << token << " exiting: returned false (session no longer exists?)" << std::endl; |
|
*/ |
|
} |
|
|
|
int main() |
|
{ |
|
indexHtml << std::ifstream ("index.html").rdbuf(); |
|
if (!indexHtml.str().length()) { |
|
std::cerr << "Failed to load index.html" << std::endl; |
|
return -1; |
|
} |
|
clientJs << std::ifstream ("clientjs.js").rdbuf(); |
|
if (!clientJs.str().length()) { |
|
std::cerr << "Failed to load clientjs.js" << std::endl; |
|
return -1; |
|
} |
|
|
|
uWS::Hub h; |
|
std::mt19937 rng(std::random_device{}()); // !!! THIS IS NOT GOOD ENOUGH !!! DON'T USE THIS IN PRODUCTION! |
|
h.onHttpRequest([&rng](uWS::HttpResponse *res, uWS::HttpRequest req, char *data, size_t, size_t) { |
|
uWS::Header urlHeader = req.getUrl(); |
|
std::string url = urlHeader.toString(); |
|
std::cout << "HTTP Request at URL " << url << " from " << res->httpSocket->getAddress().address << ":" << res->httpSocket->getAddress().port << std::endl; |
|
if (url == INDEX_URL) { |
|
res->end(indexHtml.str().data(), indexHtml.str().length()); |
|
} |
|
else if (url == ENDPOINT_CLIENT_JS) { |
|
res->end(clientJs.str().data(), clientJs.str().length()); |
|
} |
|
else if (url == ENDPOINT_HTTP) { |
|
long token = rng(); |
|
clients.insert({token, std::vector<uWS::WebSocket<uWS::SERVER>*>()}); |
|
std::string resp = std::string("{\"token\" : ") + std::to_string(token) + ", \"maxConnections\" : 10}"; |
|
res->end(resp.data(), resp.length()); |
|
std::cout << "Authorized session " << token << std::endl; |
|
//std::thread t(data_feed_per_client, token); |
|
//t.detach(); |
|
} |
|
else if (url.compare(0, ENDPOINT_MANAGE.length(), ENDPOINT_MANAGE) == 0) { |
|
try { |
|
std::string commandStr = url.substr(ENDPOINT_MANAGE.length(), url.length() - ENDPOINT_MANAGE.length()); |
|
std::string valueStr; |
|
if (commandStr.compare(0, COMMAND_SET_STICK.length(), COMMAND_SET_STICK) == 0) { |
|
valueStr = commandStr.substr(COMMAND_SET_STICK.length(), commandStr.length() - COMMAND_SET_STICK.length()); |
|
CONFIG_TICKRATE = std::stol(valueStr); |
|
std::cout << "Set server-to-client tick rate to " << CONFIG_TICKRATE << " ticks/second" << std::endl; |
|
} |
|
else { |
|
std::cerr << "Recieved an unknown control message!" << std::endl; |
|
} |
|
} catch (const std::out_of_range e) { |
|
std::cerr << "Received an invalid control message!" << std::endl; |
|
} |
|
httpres_send_empty_end(res); |
|
} |
|
else { |
|
httpres_send_empty_end(res); |
|
} |
|
}); |
|
|
|
|
|
h.onConnection([&h](uWS::WebSocket<uWS::SERVER> *ws, uWS::HttpRequest req) { |
|
// in a real app, we would ensure the user is authorized. |
|
// we should set userdata on the socket so onMessage can clear the busy flag |
|
ws->setNoDelay(ENABLE_NODELAY); |
|
std::cout << "Got a new connection from " << ws->getAddress().address << ":" << ws->getAddress().port << std::endl; |
|
uWS::Header urlHeader = req.getUrl(); |
|
std::string url = urlHeader.toString(); |
|
if (url.compare(0, ENDPOINT_WS.length(), ENDPOINT_WS) != 0) { |
|
std::cerr << "Someone attempted to connect on a nonexistent endpoint!" << std::endl; |
|
ws->close(); |
|
} |
|
try { |
|
// need to get client's token here |
|
std::string tokenStr = url.substr(ENDPOINT_WS.length(), url.length() - ENDPOINT_WS.length()); |
|
long token = std::stol(tokenStr); |
|
std::vector<uWS::WebSocket<uWS::SERVER>*>& connectionsFromClient = clients.at(token); |
|
sock_userdata_s* ud = new sock_userdata_s({token, connectionsFromClient.size(), false}); |
|
ws->setUserData(ud); |
|
connectionsFromClient.insert(connectionsFromClient.end(), ws); |
|
std::cout << "Session " << token << " now connected with " << connectionsFromClient.size() << " active connections" << std::endl; |
|
} catch (const std::out_of_range e) { |
|
std::cerr << "Someone attempted to connect with an unrecognized token!" << std::endl; |
|
sock_userdata_s* ud = new sock_userdata_s({0, 0, false}); // need to set so that we don't crash when removing the connection |
|
ws->setUserData(ud); |
|
ws->close(); |
|
} |
|
}); |
|
|
|
// we should remove connections from the vector if/when they are dropped. |
|
h.onDisconnection([&h](uWS::WebSocket<uWS::SERVER> *ws, int code, char *message, size_t length) { |
|
// this needs to lock so the socket is removed from the vector before it is freed (when this returns) |
|
try { |
|
clients_write_mutex.lock(); |
|
} catch (std::exception e) { |
|
std::cerr << "Failed to get mutex!?" << std::endl; |
|
return; |
|
} |
|
//std::cout << "Got mutex for exclusive access" << std::endl; |
|
sock_userdata_s* ud = static_cast<sock_userdata_s*>(ws->getUserData()); |
|
long token = ud->owningToken; |
|
size_t sockNum = ud->sockNum; |
|
// We MUST renumber the sockets or we will go out-of-bounds on the vector! |
|
try { |
|
std::vector<uWS::WebSocket<uWS::SERVER>*>& connections = clients.at(token); |
|
auto iter = connections.begin() + sockNum; |
|
if (iter >= connections.end()) { |
|
std::cerr << "Tried to remove a socket beyond the end of the vector!" << std::endl; |
|
} |
|
connections.erase(iter); // iter is invalidated |
|
std::cout << "Removed disconnected socket from session " << token << " (" << connections.size() << " remain)" << std::endl; |
|
// renumber remaining sockets |
|
for (size_t i = 0; i < connections.size(); i++) { |
|
static_cast<sock_userdata_s*>(connections.at(i)->getUserData())->sockNum = i; |
|
} |
|
if (connections.size() == 0) { |
|
clients.erase(clients.find(token)); |
|
std::cout << "Closed session " << token << std::endl; |
|
} |
|
} catch (std::out_of_range e) { |
|
std::cerr << "Disconnect from a nonexistent socket?!" << std::endl; |
|
} |
|
delete ud; |
|
//std::cout << "Releasing mutex from exlusive mode" << std::endl; |
|
clients_write_mutex.unlock(); |
|
}); |
|
|
|
h.onMessage([&h](uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length, uWS::OpCode opCode) { |
|
// if this is an acknowlegement, flag channel is ready. |
|
if (length == 1 && *message == '*') { |
|
// reset busy flag |
|
clear_sock_busy(ws); |
|
} |
|
//std::cout << "Got a message!" << std::endl; |
|
else if (opCode == uWS::OpCode::TEXT || opCode == uWS::OpCode::BINARY) { |
|
// you would send the message off for processing here, we just ack |
|
ws->send("*", 1, uWS::OpCode::TEXT); |
|
} |
|
}); |
|
std::srand(std::time(0)); |
|
std::cout << "OK, ready to start!" << std::endl; |
|
std::cout << "Server is set up! Use the management endpoint to control it!" << std::endl; |
|
h.listen(3000); |
|
h.run(); |
|
|
|
// now, this thread goes to sleep |
|
while (true) { |
|
std::this_thread::sleep_for(std::chrono::milliseconds(5000)); |
|
} |
|
} |