Skip to content

Instantly share code, notes, and snippets.

@gx578007
Last active July 4, 2019 11:37
Show Gist options
  • Save gx578007/637a40195a32a451b74ada20e9e53d20 to your computer and use it in GitHub Desktop.
Save gx578007/637a40195a32a451b74ada20e9e53d20 to your computer and use it in GitHub Desktop.
#include "message_listener.h"
// This function is called in the event loop when "uv_async_send(&AW->async)" is executed in "msg_listener.h".
void invokeHandlers(uv_async_t *async){
Isolate * isolate = Isolate::GetCurrent();
v8::HandleScope handleScope(isolate);
// Get required data from async handler.
// Reference: "AW->async.data = AW;" in "listenMessage".
AsyncWork *work = static_cast<AsyncWork*>(async->data);
// Since "work->msgArr" can be manipulated simultaneously from multiple threads,
// it is required to lock it when accessing it.
uv_rwlock_wrlock(&work->lock);
// Make a copy and clear the original vector to reduce critical section.
// However, if the size of "msgArr" is very large, the copy operation can be very expensive.
// At that condition, expanding the locked section or using lock-free strategyies might be better than copying.
auto msgArr = work->msgArr;
work->msgArr.clear();
uv_rwlock_wrunlock(&work->lock);
for (int i=0; i<msgArr.size(); i++){
// processing
}
// Invoke all the stored handlers.
Local<Value> argv[2] = {err, rst};
for (uint32_t j=0; j<work->handlers.size(); j++){
Local<Function>::New(isolate, work->handlers[j])->Call(isolate->GetCurrentContext()->Global(), 2, argv);
}
}
void listenMessage(const FunctionCallbackInfo<Value> &args){
Isolate * isolate = Isolate::GetCurrent();
auto arg0 = Local<Function>::Cast(args[0]);
// Store local-typed functions as persistent copyable functions.
// Be careful to manage the life cycle of persistent functions to avoid memory leak.
// However, in this gist, it is skipped.
awork->callbacks.push_back(Persistent<Function, CopyablePersistentTraits<Function>>(isolate, arg0));
// Store "AW" in the async handler to allow reverse reference in "invokeHandlers".
AW->async.data = AW;
//Register the async handler to allow wakeup the event loop and get a callback called from another thread.
uv_async_init(uv_default_loop(), &AW->async, invokeHandlers);
}
void Init(Local<Object> exports) {
NODE_SET_METHOD(exports, "listenMessage", listenMessage);
}
NODE_MODULE(addon, Init);
// Define the structure to bind async handler and all the required data
// including all the events and event handlers.
struct AsyncWork {
uv_async_t async;
std::vector<string> msgArr;
std::vector<Persistent<Function, CopyablePersistentTraits<Function>>> handlers;
uv_rwlock_t lock;
};
// To simplify explanation, we declare a global instance to store and share required data.
static AsyncWork* AW = new AsyncWork();
// This function is called when receiving messages from websocket.
// The calling thread is different from the event loop thread.
// And this function is likely to be executed simultaneously on multiple threads.
void onMessage(const std::string& msg){
// Lock when adding new messages to the buffer.
uv_rwlock_wrlock(&AW->lock);
AW->msgArr.push_back(msg);
uv_rwlock_wrunlock(&AW->lock);
// Wakeup the event loop to handle stored messages.
// In this example, the function "invokeHandlers" will be called then.
uv_async_send(&AW->async);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment