Skip to content

Instantly share code, notes, and snippets.

@bamkrs
Last active September 3, 2019 09:27
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bamkrs/fe1348a391b9ea7e4e74d93b761bbe0d to your computer and use it in GitHub Desktop.
Save bamkrs/fe1348a391b9ea7e4e74d93b761bbe0d to your computer and use it in GitHub Desktop.
oatpp Websocket Event Distributor
OATPP_CREATE_COMPONENT(std::shared_ptr<Events>, events)("eventDistributor" /* qualifier */, [] {
return Events::createShared();
}());
void Events::distributeEvent(const oatpp::String &data) {
std::lock_guard<std::mutex> lock(m_repositoryMutex);
OATPP_LOGD(TAG, "Distributing event %s", data->c_str());
for(auto & m_w : m_ws) {
m_w->sendOneFrameText(data);
}
}
Events::Events() {
OATPP_LOGD(TAG, "created");
}
std::shared_ptr<Events> Events::createShared() {
OATPP_LOGD(TAG, "creating");
return std::make_shared<Events>();
}
void Events::registerWS(const oatpp::websocket::WebSocket *ws) {
std::lock_guard<std::mutex> lock(m_repositoryMutex);
if(findWS(ws) == m_ws.end()) {
OATPP_LOGD(TAG, "Registering WS %u", ws);
m_ws.push_back(ws);
} else {
OATPP_LOGD(TAG, "WS %u already registered", ws);
}
}
void Events::unregisterWS(const oatpp::websocket::WebSocket *ws) {
std::lock_guard<std::mutex> lock(m_repositoryMutex);
auto it = findWS(ws);
if(it != m_ws.end()) {
OATPP_LOGD(TAG, "Unregistering WS %u", ws);
m_ws.erase(it);
} else {
OATPP_LOGD(TAG, "WS %u was not registered", ws);
}
}
std::vector<const oatpp::websocket::WebSocket*>::iterator Events::findWS(const oatpp::websocket::WebSocket *ws) {
for(std::vector<const oatpp::websocket::WebSocket*>::iterator i = m_ws.begin(); i != m_ws.end(); ++i) {
if((*i) == ws) {
return i;
}
}
return m_ws.end();
}
class Events {
public:
Events();
static std::shared_ptr<Events> createShared();
/**
* Distributes Events with data
* @param data
*/
void distributeEvent(const oatpp::String &data = "");
/**
* Registeres an WebSocket ws at the EventManagers repository
* @param ws
*/
void registerWS(const oatpp::websocket::WebSocket *ws);
/**
* Unregisteres an WebSocket ws from the EventManagers repository
* @param ws
*/
void unregisterWS(const oatpp::websocket::WebSocket *ws);
private:
static constexpr const char* TAG = "Logger_EventDistributor";
std::vector<const oatpp::websocket::WebSocket*> m_ws; /// The repository
/**
* Helperfunction to search an WebSocket in the repository m_ws.
*/
std::vector<const oatpp::websocket::WebSocket*>::iterator findWS(const oatpp::websocket::WebSocket *ws);
std::mutex m_repositoryMutex;
};
void WSListener::onClose(const WebSocket& socket, v_word16 code, const oatpp::String& message) {
OATPP_LOGD(TAG, "onClose code=%d", code);
OATPP_COMPONENT(std::shared_ptr<Events>, evs, "eventDistributor");
m_socketMutex.lock();
evs->unregisterWS(&socket); // Unregister on close
m_socketMutex.unlock();
}
void WSInstanceListener::onAfterCreate(const oatpp::websocket::WebSocket& socket, const std::shared_ptr<const ParameterMap>& params) {
SOCKETS ++;
OATPP_LOGD(TAG, "New Incoming Connection. Connection count=%d", SOCKETS.load());
/* In this particular case we create one WSListener per each connection */
/* Which may be redundant in many cases */
socket.setListener(std::make_shared<WSListener>());
OATPP_COMPONENT(std::shared_ptr<Events>, evs, "eventDistributor");
evs->registerWS(&socket);
}
void WSInstanceListener::onBeforeDestroy(const oatpp::websocket::WebSocket& socket) {
SOCKETS --;
OATPP_LOGD(TAG, "Connection closed. Connection count=%d", SOCKETS.load());
OATPP_COMPONENT(std::shared_ptr<Events>, evs, "eventDistributor");
evs->unregisterWS(&socket); //unregister before destroy, better be safe then sorry!
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment