Last active
September 3, 2019 09:27
-
-
Save bamkrs/fe1348a391b9ea7e4e74d93b761bbe0d to your computer and use it in GitHub Desktop.
oatpp Websocket Event Distributor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
OATPP_CREATE_COMPONENT(std::shared_ptr<Events>, events)("eventDistributor" /* qualifier */, [] { | |
return Events::createShared(); | |
}()); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
void Events::distributeEvent(const oatpp::String &data) { | |
std::lock_guard<std::mutex> lock(m_repositoryMutex); | |
OATPP_LOGD(TAG, "Distributing event %s", data->c_str()); | |
for(auto & m_w : m_ws) { | |
m_w->sendOneFrameText(data); | |
} | |
} | |
Events::Events() { | |
OATPP_LOGD(TAG, "created"); | |
} | |
std::shared_ptr<Events> Events::createShared() { | |
OATPP_LOGD(TAG, "creating"); | |
return std::make_shared<Events>(); | |
} | |
void Events::registerWS(const oatpp::websocket::WebSocket *ws) { | |
std::lock_guard<std::mutex> lock(m_repositoryMutex); | |
if(findWS(ws) == m_ws.end()) { | |
OATPP_LOGD(TAG, "Registering WS %u", ws); | |
m_ws.push_back(ws); | |
} else { | |
OATPP_LOGD(TAG, "WS %u already registered", ws); | |
} | |
} | |
void Events::unregisterWS(const oatpp::websocket::WebSocket *ws) { | |
std::lock_guard<std::mutex> lock(m_repositoryMutex); | |
auto it = findWS(ws); | |
if(it != m_ws.end()) { | |
OATPP_LOGD(TAG, "Unregistering WS %u", ws); | |
m_ws.erase(it); | |
} else { | |
OATPP_LOGD(TAG, "WS %u was not registered", ws); | |
} | |
} | |
std::vector<const oatpp::websocket::WebSocket*>::iterator Events::findWS(const oatpp::websocket::WebSocket *ws) { | |
for(std::vector<const oatpp::websocket::WebSocket*>::iterator i = m_ws.begin(); i != m_ws.end(); ++i) { | |
if((*i) == ws) { | |
return i; | |
} | |
} | |
return m_ws.end(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Events { | |
public: | |
Events(); | |
static std::shared_ptr<Events> createShared(); | |
/** | |
* Distributes Events with data | |
* @param data | |
*/ | |
void distributeEvent(const oatpp::String &data = ""); | |
/** | |
* Registeres an WebSocket ws at the EventManagers repository | |
* @param ws | |
*/ | |
void registerWS(const oatpp::websocket::WebSocket *ws); | |
/** | |
* Unregisteres an WebSocket ws from the EventManagers repository | |
* @param ws | |
*/ | |
void unregisterWS(const oatpp::websocket::WebSocket *ws); | |
private: | |
static constexpr const char* TAG = "Logger_EventDistributor"; | |
std::vector<const oatpp::websocket::WebSocket*> m_ws; /// The repository | |
/** | |
* Helperfunction to search an WebSocket in the repository m_ws. | |
*/ | |
std::vector<const oatpp::websocket::WebSocket*>::iterator findWS(const oatpp::websocket::WebSocket *ws); | |
std::mutex m_repositoryMutex; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
void WSListener::onClose(const WebSocket& socket, v_word16 code, const oatpp::String& message) { | |
OATPP_LOGD(TAG, "onClose code=%d", code); | |
OATPP_COMPONENT(std::shared_ptr<Events>, evs, "eventDistributor"); | |
m_socketMutex.lock(); | |
evs->unregisterWS(&socket); // Unregister on close | |
m_socketMutex.unlock(); | |
} | |
void WSInstanceListener::onAfterCreate(const oatpp::websocket::WebSocket& socket, const std::shared_ptr<const ParameterMap>& params) { | |
SOCKETS ++; | |
OATPP_LOGD(TAG, "New Incoming Connection. Connection count=%d", SOCKETS.load()); | |
/* In this particular case we create one WSListener per each connection */ | |
/* Which may be redundant in many cases */ | |
socket.setListener(std::make_shared<WSListener>()); | |
OATPP_COMPONENT(std::shared_ptr<Events>, evs, "eventDistributor"); | |
evs->registerWS(&socket); | |
} | |
void WSInstanceListener::onBeforeDestroy(const oatpp::websocket::WebSocket& socket) { | |
SOCKETS --; | |
OATPP_LOGD(TAG, "Connection closed. Connection count=%d", SOCKETS.load()); | |
OATPP_COMPONENT(std::shared_ptr<Events>, evs, "eventDistributor"); | |
evs->unregisterWS(&socket); //unregister before destroy, better be safe then sorry! | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment