Last active
July 4, 2019 11:37
-
-
Save gx578007/637a40195a32a451b74ada20e9e53d20 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "message_listener.h" | |
// This function is called in the event loop when "uv_async_send(&AW->async)" is executed in "msg_listener.h". | |
void invokeHandlers(uv_async_t *async){ | |
Isolate * isolate = Isolate::GetCurrent(); | |
v8::HandleScope handleScope(isolate); | |
// Get required data from async handler. | |
// Reference: "AW->async.data = AW;" in "listenMessage". | |
AsyncWork *work = static_cast<AsyncWork*>(async->data); | |
// Since "work->msgArr" can be manipulated simultaneously from multiple threads, | |
// it is required to lock it when accessing it. | |
uv_rwlock_wrlock(&work->lock); | |
// Make a copy and clear the original vector to reduce critical section. | |
// However, if the size of "msgArr" is very large, the copy operation can be very expensive. | |
// At that condition, expanding the locked section or using lock-free strategyies might be better than copying. | |
auto msgArr = work->msgArr; | |
work->msgArr.clear(); | |
uv_rwlock_wrunlock(&work->lock); | |
for (int i=0; i<msgArr.size(); i++){ | |
// processing | |
} | |
// Invoke all the stored handlers. | |
Local<Value> argv[2] = {err, rst}; | |
for (uint32_t j=0; j<work->handlers.size(); j++){ | |
Local<Function>::New(isolate, work->handlers[j])->Call(isolate->GetCurrentContext()->Global(), 2, argv); | |
} | |
} | |
void listenMessage(const FunctionCallbackInfo<Value> &args){ | |
Isolate * isolate = Isolate::GetCurrent(); | |
auto arg0 = Local<Function>::Cast(args[0]); | |
// Store local-typed functions as persistent copyable functions. | |
// Be careful to manage the life cycle of persistent functions to avoid memory leak. | |
// However, in this gist, it is skipped. | |
awork->callbacks.push_back(Persistent<Function, CopyablePersistentTraits<Function>>(isolate, arg0)); | |
// Store "AW" in the async handler to allow reverse reference in "invokeHandlers". | |
AW->async.data = AW; | |
//Register the async handler to allow wakeup the event loop and get a callback called from another thread. | |
uv_async_init(uv_default_loop(), &AW->async, invokeHandlers); | |
} | |
void Init(Local<Object> exports) { | |
NODE_SET_METHOD(exports, "listenMessage", listenMessage); | |
} | |
NODE_MODULE(addon, Init); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Define the structure to bind async handler and all the required data | |
// including all the events and event handlers. | |
struct AsyncWork { | |
uv_async_t async; | |
std::vector<string> msgArr; | |
std::vector<Persistent<Function, CopyablePersistentTraits<Function>>> handlers; | |
uv_rwlock_t lock; | |
}; | |
// To simplify explanation, we declare a global instance to store and share required data. | |
static AsyncWork* AW = new AsyncWork(); | |
// This function is called when receiving messages from websocket. | |
// The calling thread is different from the event loop thread. | |
// And this function is likely to be executed simultaneously on multiple threads. | |
void onMessage(const std::string& msg){ | |
// Lock when adding new messages to the buffer. | |
uv_rwlock_wrlock(&AW->lock); | |
AW->msgArr.push_back(msg); | |
uv_rwlock_wrunlock(&AW->lock); | |
// Wakeup the event loop to handle stored messages. | |
// In this example, the function "invokeHandlers" will be called then. | |
uv_async_send(&AW->async); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment