Skip to content

Instantly share code, notes, and snippets.

@open-tommie
Last active December 16, 2015 00:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save open-tommie/5347832 to your computer and use it in GitHub Desktop.
Save open-tommie/5347832 to your computer and use it in GitHub Desktop.
同期的な関数からNode.js関数を呼び出す実験の検証プログラムです。 Node.jsとは別のスレッドで同期的な関数を実行し、 スレッド間通信でNode.js関数を呼び出します。 Node.jsメーリングリストから。
●概要
  同期的な関数からNode.js関数を呼び出す実験の検証プログラムです。
  Node.jsとは別のスレッドで同期的な関数を実行し、
  スレッド間通信でNode.js関数を呼び出します。
  Node.jsメーリングリストから。
  以下全て、Fedora18で検証しています。Windowsでは多少、修整が必要です。
・インストール準備
  以下のモジュールをインストールして下さい。
    1.node-gyp
    2.socket.io
    3.jQuery
  "js"ディレクトリを作成し、
  以下のファイルを"js"ディレクトリへコピーして下さい。
    ・jQueryから
      WebSocketMain.swf
      WebSocketMainInsecure.swf
      jquery-1.9.1.min.js
    ・socket.ioから
      socket.io.js
      socket.io.min.js
・ファイル構成
.
|-- plugin.cc
|-- plugin.h
|-- readme.txt
|-- sub.cc
|-- sub.so
|-- test.js
|-- tree.txt
|-- web.js
|-- main
|-- main.cc
|-- Makefile
|-- index.html
|-- binding.gyp
|-- js
| |-- WebSocketMain.swf
| |-- WebSocketMainInsecure.swf
| |-- jquery-1.9.1.min.js
| |-- socket.io.js
| `-- socket.io.min.js
(参考)
|-- node_modules
| |-- socket.io
| `-- websocket.io
|-- build
| |-- Makefile
| |-- Release
| | |-- addon.node
| | `-- obj.target
| | |-- addon
| | | `-- plugin.o
| | `-- addon.node
| |-- addon.target.mk
| |-- binding.Makefile
| `-- config.gypi
・インストール手順
  make
・テストの手順
  ・同期プログラムのテスト手順
    以下のコマンドを実行して標準出力へログ出力すればOKです。
      ./main
    ずっとログ出力するので、コントロールCで停止して下さい。
  
  ・Node.js関数呼び出しテスト手順
    0. テスト実行する前に、WebSocketサーバのIPアドレスを
      自分の環境に書き換えて下さい。
      "localhost"ではダメです。IPアドレスを書いて下さい。
      index.htmlの18行目:
      var socket = io.connect('http://192.168.0.105:10080/');
    1.webサーバを実行します。ポートを9080に指定して実行します。
      node web.js 9080
    2.別のターミナルでWebSocketサーバを実行します。WebSocketのポートは10080です。
      node test.js
    3.webブラウザで1.のページをポート9080で開きます。
      1.を実行したマシンのアドレスを指定します。
      http://192.168.0.105:9080/
      ブラウザは複数開けます。
     
    4.webページのテキストエリアにずっとログ出力していれば成功です。
     
・ファイルの説明
  ./main - 同期処理用プログラム
    同期処理のメインプログラム。sub.soを動的ロードします。
  sub.so - 同期的な関数の動的ライブラリです。
    addon.nodeと./mainから動的ロードされます。
  main.cc - 同期処理用メイン
    ./main用です。sub.ccの関数を呼び出します。
  sub.cc - 同期処理関数
    plugin_main()は同期処理です。Nodeを前提に書かれていません。
  test.js
    WebSocketサーバです。addon.nodeアドオンを読み、
    sub.soを動的ロードします。
    sub.soのplugin_main()がfooを呼び出して、
    メッセージをWebSocketで配信します。
  build/Release/obj.target/addon.node - Nodeアドオン
    test.jsがrequire()でロードするアドオンです。
  plugin.cc - Nodeアドオンaddon.nodeのメイン処理
    Nodeとは別のスレッドを作成し、別スレッドで
    動的ロードしたsub.so内のplugin_main()を実行します。
  index.html - Webページ
    WebSocketでサーバと接続して、
    受信したメッセージをテキストエリアへ表示します。
  binding.gyp - node-gyp定義ファイル
    Nodeアドオン用の定義ファイルです。
  web.js - Webサーバ
    https://gist.github.com/rpflorence/701407
・テストしたブラウザ
  Fedora18
    Chrome 27.0.1453.15 beta
  Windows8
    Chrome 26.0.1410.64 m
    IE10 10.0.4
  iPhone5(iOS6.1.3)
    safari
    Chrome 26.0.1410.50
以上。
{
"targets": [
{
"target_name": "addon",
"sources": [ "plugin.cc" ]
}
]
}
<!DOCTYPE html>
<html>
<head>
<meta name="viewport" content="initial-scale=1.0, user-scalable=yes" />
<meta charset="UTF-8" />
<title>log</title>
<script src="/js/jquery-1.9.1.min.js"></script>
<script src="/js/socket.io.js"></script>
<script>
window.onload = function() {
console.log( "onload()");
var g_msg = "";
var print = function( msg) {
g_msg = msg +"\n"+ g_msg;
$('#log').val( g_msg);
}
var socket = io.connect('http://192.168.0.105:10080/');
socket.on('connect', function () {
console.log( "onConnect()");
print( "接続開始しました。");
});
socket.on('message', function (msg) {
console.log( "onMessage() msg="+msg);
print( msg);
});
socket.on('disconnect', function () {
console.log( "onDisconnect()");
print( "接続が切れました。");
});
}
</script>
</head>
<body>
log<br>
<textarea id="log" cols="80" rows="20" maxlength="4000" >log area</textarea>
</body>
</html>
//
// Node.js test program
// 2013-04-13 by open.tommie@gmail.com
//
#include <stdio.h>
#include "plugin.h"
int main()
{
printf("main(): sart\n");
plugin_init( printf);
plugin_main();
printf("main(): end\n");
return 0;
}
CFLAGS=-c -I/usr/local/src/libuv/include \
-D_GNU_SOURCE -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64 -pthread
ADDON=build/Release/obj.target/addon.node
all: ./main sub.so ${ADDON}
main: main.cc sub.so plugin.h
g++ -o main main.cc sub.so
sub.so: sub.cc plugin.h
g++ -fPIC -shared sub.cc -o sub.so
build/Makefile:
node-gyp configure
${ADDON}: build/Makefile plugin.cc plugin.h
cd build; make
clean_target:
rm -f ./main sub.so ${ADDON}
#EOF
//
// libuv thread test program
// 2013-04-11 mod. by open.tommie@gmail.com
//
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <queue>
#include <cstdarg>
#include "uv.h"
#include "plugin.h"
//#define BUILDING_NODE_EXTENSION
#include <node.h>
using namespace v8;
using namespace std;
uv_loop_t *g_pLoop;
//uv_async_t g_async_node_print_main;
//uv_async_t g_async_close_all;
//static int g_closed = 0;
static int g_async_called = 0;
static uv_mutex_t g_mutex;
static int g_line_count = 0;
const int WORKER_MAX = 1; // must be 1
static uv_async_t g_async[ WORKER_MAX];
//uv_async_t g_async_node_print_main;
static int g_thread_no[ WORKER_MAX];
const int MSG_SIZE = 256;
struct QueMessage {
enum KIND { KIND_MESSAGE, KIND_CLOSE} m_kind;
int m_iThread_no;
char m_cMessage[ MSG_SIZE]; // message string
};
static queue<QueMessage*> g_msgQueue;
static void (*g_pfPlugin_main)( void) = NULL;
extern "C" void node_printf( const char* p_format, ...);
void node_callback( char* p_msg)
{
v8::Local<v8::Function> foo =
v8::Local<v8::Function>::Cast(Context::GetCurrent()->Global()->Get(v8::String::New("foo")));
const unsigned argc = 1;
v8::Handle<v8::Value> argv[argc] = { v8::String::New( p_msg) };
foo->Call(Context::GetCurrent()->Global(), argc, argv);
}
void node_print_main(uv_async_t *handle, int status /*UNUSED*/) {
g_async_called++;
uv_mutex_lock(&g_mutex);
while ( !g_msgQueue.empty()) {
QueMessage* pMsg = g_msgQueue.front();
g_msgQueue.pop();
switch ( pMsg->m_kind) {
case QueMessage::KIND_MESSAGE:
fprintf(stderr, "%2d: async_called=%3d, thread=%lu, From=%d, KIND_MSG: %s",
++g_line_count, g_async_called, uv_thread_self(),
pMsg->m_iThread_no, pMsg->m_cMessage);
node_callback( pMsg->m_cMessage);
break;
case QueMessage::KIND_CLOSE:
fprintf(stderr, "%2d: async_called=%3d, thread=%lu: From=%d, KIND_CLOSE\n",
++g_line_count, g_async_called, uv_thread_self(),
pMsg->m_iThread_no);
uv_close( (uv_handle_t*)&g_async[ pMsg->m_iThread_no], NULL);
break;
}
free( pMsg);
}
uv_mutex_unlock(&g_mutex);
}
void node_printf( const char* p_format, ...) {
va_list ap;
va_start(ap, p_format);
QueMessage *pMsg = NULL;
fprintf(stderr, "node_printf()\n");
pMsg = (QueMessage*)malloc( sizeof( QueMessage));
pMsg->m_kind = QueMessage::KIND_MESSAGE;
pMsg->m_iThread_no = -1; // not used
vsprintf( pMsg->m_cMessage, p_format, ap);
va_end(ap);
uv_mutex_lock(&g_mutex);
g_msgQueue.push( pMsg);
uv_mutex_unlock(&g_mutex);
uv_async_send(&g_async[ 0]); // must be WORKER==1
// free( msg) to be called later
}
void woker(uv_work_t *req) {
int thread_no = *(int*)( req->data);
fprintf(stderr, "%2d: g_async_called=%3d, thread=%lx: %s\n",
++g_line_count, g_async_called, uv_thread_self(), "woker() start");
//node_main();
if ( g_pfPlugin_main) {
g_pfPlugin_main();
}
// notify end of worker
QueMessage* pMsg = (QueMessage*)malloc( sizeof( QueMessage));
pMsg->m_kind = QueMessage::KIND_CLOSE;
pMsg->m_iThread_no = thread_no;
uv_mutex_lock(&g_mutex);
g_msgQueue.push( pMsg);
uv_mutex_unlock(&g_mutex);
uv_async_send(&g_async[ thread_no]);
fprintf(stderr, "%2d: g_async_called=%3d, thread=%lx: %s\n",
++g_line_count, g_async_called, uv_thread_self(), "woker() end");
}
void after0(uv_work_t *req, int status) {
// int thread_no = *((int*) req->data);
// g_closed++;
}
void lib_load( void) {
fprintf(stderr, "lib_load() start\n");
char* lib_name = (char*)"./sub.so";
uv_lib_t *lib = (uv_lib_t*) malloc(sizeof(uv_lib_t));
fprintf(stderr, "Loading %s\n", lib_name);
if (uv_dlopen( lib_name, lib)) {
fprintf(stderr, "Error: %s\n", uv_dlerror(lib));
return;
}
void (*pfPlugin_init)( PFunc) = NULL;
if (uv_dlsym(lib, "plugin_init", (void **) &pfPlugin_init)) {
fprintf(stderr, "dlsym error: %s\n", uv_dlerror(lib));
pfPlugin_init = NULL;
return;
}
pfPlugin_init( (PFunc)node_printf);
if (uv_dlsym(lib, "plugin_main", (void **) &g_pfPlugin_main)) {
fprintf(stderr, "dlsym error: %s\n", uv_dlerror(lib));
g_pfPlugin_main = NULL;
}
//uv_dlclose( lib);
}
void worker_main( void*) {
fprintf(stderr, "%2d: g_async_called=%3d, thread=%lx: %s\n",
++g_line_count, g_async_called, uv_thread_self(), "worker_main() start");
lib_load();
g_pLoop = uv_loop_new();
uv_work_t req[ WORKER_MAX];
uv_mutex_init(&g_mutex);
for ( int i = 0; i < WORKER_MAX; i++) {
g_thread_no[ i] = i;
req[ i].data = (void*) &g_thread_no[ i];;
uv_queue_work(g_pLoop, req+i, woker, after0);
}
uv_run(g_pLoop, UV_RUN_DEFAULT);
fprintf(stderr, "%2d: g_async_called=%3d, thread=%lx: %s\n",
++g_line_count, g_async_called, uv_thread_self(), "worker_main() end");
}
void init_worker() {
fprintf(stderr, "%2d: g_async_called=%3d, thread=%lx: %s\n",
++g_line_count, g_async_called, uv_thread_self(), "init_worker() start");
int dummy = 0;
uv_thread_t id;
uv_async_init( uv_default_loop(), &g_async[ 0], node_print_main);
uv_thread_create(&id, worker_main, &dummy);
fprintf(stderr, "%2d: g_async_called=%3d, thread=%lx: %s\n",
++g_line_count, g_async_called, uv_thread_self(), "init_worker() end");
}
//-------------------------------------------------------------------------
Handle<Value> RunCallback(const Arguments& args) {
HandleScope scope;
return scope.Close(Undefined());
}
void Init(Handle<Object> exports, Handle<Object> module) {
module->Set(String::NewSymbol("exports"),
FunctionTemplate::New(RunCallback)->GetFunction());
init_worker();
}
NODE_MODULE(addon, Init)
//#include <cstdarg>
//extern "C" void node_printf( const char* p_format, ...);
//int node_main( void);
typedef int (*PFunc)( const char*, ...);
extern "C" {
void plugin_init( PFunc node_printf);
void plugin_main();
}
//
// Node.js test program
// 2013-04-13 by open.tommie@gmail.com
//
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include "plugin.h"
#define printf pfNode_printf
//#define main node_main
static int (*pfNode_printf)( const char* p_format, ...);
void plugin_init( PFunc p_node_printf)
{
fprintf( stderr, "plugin_init()\n");
pfNode_printf = p_node_printf;
}
void plugin_main()
{
int size = 3600;
double percentage;
int downloaded = 0;
int send_count = 0;
fprintf( stderr, "plugin_main()\n");
printf("plugin_main(): %2d: from=%lx start\n",
++send_count);
time_t t = time(NULL);
while (downloaded < size) {
percentage = downloaded*100.0/size;
printf("plugin_main(): %2d: from=%lx Downloaded %.2f%%\n",
++send_count, percentage);
// 一秒消費
while ( t == time(NULL)) {
for ( int i = 0; i < 10000; i++) {
; // dummy
}
}
t = time(NULL);
downloaded++;
}
printf("plugin_main(): %2d: from=%lx Download END\n",
++send_count);
// return 0;
}
var debug = require('util').debug;
debug( "node main.start");
var io = require('socket.io').listen(10080);
io.set('log level', 1); // reduce logging
var addon = require('./build/Release/addon');
zero = function( str) {
return ("0"+str).substr(-2);
}
var g_buff = [];
var g_sockets = [];
foo = function( msg) {
msg = msg.replace(/\n$/g,''); // remove new line '\n'
debug("node foo() msg="+msg);
var date = new Date();
var time = zero(date.getHours()) + ":" +
zero(date.getMinutes()) + ":" + zero(date.getSeconds());
var time_msg = time + " " + msg;
g_buff.push( time_msg);
if ( g_buff.length > 20) {
g_buff.shift();
}
g_sockets.forEach(function(socket) {
socket.send( time_msg);
});
}
var interval_sec = 5000;
var total_sec = 0;
setInterval( function() {
total_sec += interval_sec;
foo( "--- total_sec="+ total_sec/1000);
}, interval_sec);
io.sockets.on('connection', function (socket) {
g_sockets.push( socket);
socket.on('message', function () {
debug( "onMessage()");
});
g_buff.forEach( function( msg) {
socket.send( msg);
});
socket.on('disconnect', function () {
debug( "onDisconnect()");
delete g_sockets[ socket];
});
});
debug( "node main.end");
var http = require("http"),
url = require("url"),
path = require("path"),
fs = require("fs")
port = process.argv[2] || 8080;
http.createServer(function(request, response) {
var uri = url.parse(request.url).pathname
, filename = path.join(process.cwd(), uri);
var contentTypesByExtension = {
'.html': "text/html",
'.css': "text/css",
'.js': "text/javascript"
};
path.exists(filename, function(exists) {
if(!exists) {
response.writeHead(404, {"Content-Type": "text/plain"});
response.write("404 Not Found\n");
response.end();
return;
}
if (fs.statSync(filename).isDirectory()) filename += '/index.html';
fs.readFile(filename, "binary", function(err, file) {
if(err) {
response.writeHead(500, {"Content-Type": "text/plain"});
response.write(err + "\n");
response.end();
return;
}
var headers = {};
var contentType = contentTypesByExtension[path.extname(filename)];
if (contentType) headers["Content-Type"] = contentType;
response.writeHead(200, headers);
response.write(file, "binary");
response.end();
});
});
}).listen(parseInt(port, 10));
console.log("Static file server running at\n => http://localhost:" + port + "/\nCTRL + C to shutdown");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment