Skip to content

Instantly share code, notes, and snippets.

@open-tommie
Forked from shigeki/main.c
Last active December 15, 2015 23:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save open-tommie/5342559 to your computer and use it in GitHub Desktop.
Save open-tommie/5342559 to your computer and use it in GitHub Desktop.
●概要
  libuvを用いたワーカースレッドでのスレッド間通信の検証プログラムです。
  Node.js関数へメッセージを送信することが最終目標です。
  Node.jsメーリングリストから。
●main.c変更点
・ワーカー関数をひとつにしました。
 fake_download1(), after1()を削除。
・fake_download0()内で使用していたmsgは
 スタック変数であり、スレッドセーフではないので
 malloc()で取得したヒープ上メモリ領域を使うように変更しました。(main.c 48行目)
 使用済みメモリ領域はprint_progress()内でfree()しています。(main.c 33行目)
・uv_async_send()を実行してからprint_progress()が呼ばれる間に
 複数回uv_async_send()が呼ばれても、print_progress()の実行は1回だけ
 です(uv_async_send()の仕様)。
 この場合、全てのmsgが処理されるわけではない、ということになります。
 これを防ぐ為に、async.dataでmsgを送信するのをやめて、
 代わりにmsg用のキューg_msgQueueを使いました。(main.c 26~28, 53行目)
 uv_async_send()はprint_progress()呼び出しタイミングを通知するだけになりました。
・STLのqueueはスレッドセーフではないので、queue操作の前後に
 uv_mutex_lock(), uv_mutex_unlock()を加えて、
 スレッドセーフにしています。
●log.txtログの解説
1: メインループスレッドのスレッドidは7275c740
2: from=7275a700がmsgを送信した側のスレッドid、送信した回数はsend_count=1回目
3~4: fromが2と異なるので別のスレッドが送信している
2~4: async_called=1で同じ番号なのは、
 1度だけ呼び出されたprint_progress()内で
 キューに溜まった3こ分のmsgを出力したことを示す。
22~24, 29: fake_download0()処理が終了したので、after()が呼ばれた。
 スレッドidが1と同じなので、after()はメインループスレッドで実行されている
 closedカウントが増えている
25~28: 新しいワーカーを開始した。
 25はfrom=71f59700なので、5と同じスレッドidで実行している。
 スレッドプールで、スレッドを使いまわしていることが分かる。
61: 最終的にmain.c 83行目 WORKER_MAXで指定した10このワーカーが実行終了した
62: main.c 70行目、after0()内のuv_close()が実行され、main()内のuv_run()を抜けた
●もうちょっとやりたいこと
最終的にはNode.jsコールバック関数を呼ぶことなので
そこまで検証したいところ。
print_progress()内fprintf()の代わりにコールバック関数を
呼び出せば良さそう。
(main.c 30行目)
Callbacks
http://nodejs.jp/nodejs.org_ja/docs/v0.10/api/addons.html#addons_callbacks
でもその前に、main.cをアドオン化しないと、
コールバック関数が使えない。
こんな時、Node.jsが埋め込みサポートしていれば、
アドオン化しなくて良いのにぃ……。
●注意点
・Fedora18でテストしたので、pthread_self()などを
 Windows用関数に変更する必要があります。
Linux vf2 3.8.5-201.fc18.x86_64 #1 SMP Thu Mar 28 21:01:19 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux
1: async_called= 0, thread=d7800740: main() start
2: async_called= 1, thread=d7800740: from=d77fe700(send_count=1) Downloaded 0.00%
3: async_called= 1, thread=d7800740: from=d77fe700(send_count=2) Downloaded 5.00%
4: async_called= 1, thread=d7800740: from=d77fe700(send_count=3) Downloaded 10.00%
5: async_called= 1, thread=d7800740: from=d77fe700(send_count=4) Downloaded 15.00%
6: async_called= 1, thread=d7800740: from=d77fe700(send_count=5) Downloaded 20.00%
7: async_called= 1, thread=d7800740: from=d77fe700(send_count=6) Downloaded 25.00%
8: async_called= 1, thread=d7800740: from=d77fe700(send_count=7) Downloaded 30.00%
9: async_called= 1, thread=d7800740: from=d77fe700(send_count=8) Downloaded 35.00%
10: async_called= 3, thread=d7800740: from=d77fe700(send_count=9) Downloaded 40.00%
11: async_called= 3, thread=d7800740: from=d77fe700(send_count=10) Downloaded 45.00%
12: async_called= 3, thread=d7800740: from=d77fe700(send_count=11) Downloaded 50.00%
13: async_called= 3, thread=d7800740: from=d77fe700(send_count=12) Downloaded 55.00%
14: async_called= 3, thread=d7800740: from=d77fe700(send_count=13) Downloaded 60.00%
15: async_called= 3, thread=d7800740: from=d77fe700(send_count=14) Downloaded 65.00%
16: async_called= 3, thread=d7800740: from=d77fe700(send_count=15) Downloaded 70.00%
17: async_called= 3, thread=d7800740: from=d77fe700(send_count=16) Downloaded 75.00%
18: async_called= 3, thread=d7800740: from=d77fe700(send_count=17) Downloaded 80.00%
19: async_called= 3, thread=d7800740: from=d77fe700(send_count=18) Downloaded 85.00%
20: async_called= 3, thread=d7800740: from=d77fe700(send_count=19) Downloaded 90.00%
21: async_called= 3, thread=d7800740: from=d77fe700(send_count=20) Downloaded 95.00%
22: async_called= 4, thread=d7800740: from=d67fc700(send_count=1) Downloaded 0.00%
23: async_called= 4, thread=d7800740: from=d67fc700(send_count=2) Downloaded 5.00%
24: async_called= 4, thread=d7800740: from=d67fc700(send_count=3) Downloaded 10.00%
25: async_called= 4, thread=d7800740: from=d67fc700(send_count=4) Downloaded 15.00%
26: async_called= 4, thread=d7800740: from=d67fc700(send_count=5) Downloaded 20.00%
27: async_called= 4, thread=d7800740: from=d67fc700(send_count=6) Downloaded 25.00%
28: async_called= 4, thread=d7800740: from=d67fc700(send_count=7) Downloaded 30.00%
from=d77fe700(send_count=20) Downloaded END
29: async_called= 5, thread=d7800740: after(), closed=1
30: async_called= 6, thread=d7800740: from=d67fc700(send_count=8) Downloaded 35.00%
31: async_called= 6, thread=d7800740: from=d67fc700(send_count=9) Downloaded 40.00%
32: async_called= 6, thread=d7800740: from=d67fc700(send_count=10) Downloaded 45.00%
33: async_called= 6, thread=d7800740: from=d67fc700(send_count=11) Downloaded 50.00%
34: async_called= 6, thread=d7800740: from=d67fc700(send_count=12) Downloaded 55.00%
35: async_called= 6, thread=d7800740: from=d67fc700(send_count=13) Downloaded 60.00%
36: async_called= 7, thread=d7800740: from=d6ffd700(send_count=1) Downloaded 0.00%
37: async_called= 8, thread=d7800740: from=d6ffd700(send_count=2) Downloaded 5.00%
38: async_called= 9, thread=d7800740: from=d67fc700(send_count=14) Downloaded 65.00%
39: async_called= 9, thread=d7800740: from=d67fc700(send_count=15) Downloaded 70.00%
40: async_called= 9, thread=d7800740: from=d67fc700(send_count=16) Downloaded 75.00%
41: async_called= 9, thread=d7800740: from=d67fc700(send_count=17) Downloaded 80.00%
42: async_called= 9, thread=d7800740: from=d67fc700(send_count=18) Downloaded 85.00%
43: async_called= 9, thread=d7800740: from=d67fc700(send_count=19) Downloaded 90.00%
44: async_called= 9, thread=d7800740: from=d67fc700(send_count=20) Downloaded 95.00%
45: async_called= 9, thread=d7800740: from=d6ffd700(send_count=3) Downloaded 10.00%
46: async_called= 9, thread=d7800740: from=d6ffd700(send_count=4) Downloaded 15.00%
47: async_called= 9, thread=d7800740: from=d6ffd700(send_count=5) Downloaded 20.00%
from=d67fc700(send_count=20) Downloaded END
48: async_called= 10, thread=d7800740: after(), closed=2
49: async_called= 11, thread=d7800740: from=d6ffd700(send_count=6) Downloaded 25.00%
50: async_called= 11, thread=d7800740: from=d6ffd700(send_count=7) Downloaded 30.00%
51: async_called= 11, thread=d7800740: from=d6ffd700(send_count=8) Downloaded 35.00%
52: async_called= 11, thread=d7800740: from=d6ffd700(send_count=9) Downloaded 40.00%
53: async_called= 11, thread=d7800740: from=d6ffd700(send_count=10) Downloaded 45.00%
54: async_called= 11, thread=d7800740: from=d6ffd700(send_count=11) Downloaded 50.00%
55: async_called= 11, thread=d7800740: from=d6ffd700(send_count=12) Downloaded 55.00%
56: async_called= 11, thread=d7800740: from=d6ffd700(send_count=13) Downloaded 60.00%
57: async_called= 11, thread=d7800740: from=d6ffd700(send_count=14) Downloaded 65.00%
58: async_called= 11, thread=d7800740: from=d6ffd700(send_count=15) Downloaded 70.00%
59: async_called= 11, thread=d7800740: from=d6ffd700(send_count=16) Downloaded 75.00%
60: async_called= 11, thread=d7800740: from=d6ffd700(send_count=17) Downloaded 80.00%
61: async_called= 11, thread=d7800740: from=d6ffd700(send_count=18) Downloaded 85.00%
62: async_called= 11, thread=d7800740: from=d6ffd700(send_count=19) Downloaded 90.00%
63: async_called= 11, thread=d7800740: from=d6ffd700(send_count=20) Downloaded 95.00%
from=d6ffd700(send_count=20) Downloaded END
64: async_called= 11, thread=d7800740: after(), closed=3
65: async_called= 11, thread=d7800740: main() end
//
// libuv thread test program
// 2013-04-11 mod. by open.tommie@gmail.com
//
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <queue>
#include "uv.h"
using namespace std;
static uv_loop_t *loop;
static int closed = 0;
static int async_called = 0;
static uv_mutex_t g_mutex;
static int g_line_count = 0;
const int WORKER_MAX = 3;
static uv_async_t g_async[ WORKER_MAX];
static int g_thread_no[ WORKER_MAX];
const int MSG_SIZE = 256;
struct QueMessage {
enum KIND { KIND_MESSAGE, KIND_CLOSE} m_kind;
int m_iThread_no;
char m_cMessage[ MSG_SIZE]; // message string
};
static queue<QueMessage*> g_msgQueue;
void print_progress(uv_async_t *handle, int status /*UNUSED*/) {
async_called++;
uv_mutex_lock(&g_mutex);
while ( !g_msgQueue.empty()) {
QueMessage* pMsg = g_msgQueue.front();
g_msgQueue.pop();
switch ( pMsg->m_kind) {
case QueMessage::KIND_MESSAGE:
fprintf(stderr, "%2d: async_called=%3d, thread=%x, From=%d, KIND_MSG: %s\n",
++g_line_count, async_called, uv_thread_self(),
pMsg->m_iThread_no, pMsg->m_cMessage);
break;
case QueMessage::KIND_CLOSE:
fprintf(stderr, "%2d: async_called=%3d, thread=%x: From=%d, KIND_CLOSE\n",
++g_line_count, async_called, uv_thread_self(),
pMsg->m_iThread_no);
uv_close( (uv_handle_t*)&g_async[ pMsg->m_iThread_no], NULL);
break;
}
free( pMsg);
}
uv_mutex_unlock(&g_mutex);
//sleep(1);
}
void fake_download0(uv_work_t *req) {
int thread_no = *(int*)( req->data);
double percentage;
int downloaded = 0;
int send_count = 0;
const int size = 20;
QueMessage *pMsg = NULL;
while (downloaded < size) {
send_count++;
percentage = downloaded*100.0/size;
pMsg = (QueMessage*)malloc( sizeof( QueMessage));
pMsg->m_kind = QueMessage::KIND_MESSAGE;
pMsg->m_iThread_no = thread_no;
sprintf( pMsg->m_cMessage, "from=%d.%x(send_count=%d) Downloaded %.2f%%",
thread_no, uv_thread_self(), send_count, percentage);
uv_mutex_lock(&g_mutex);
g_msgQueue.push( pMsg);
uv_mutex_unlock(&g_mutex);
uv_async_send(&g_async[ thread_no]);
downloaded++;
}
// notify end of worker
pMsg = (QueMessage*)malloc( sizeof( QueMessage));
pMsg->m_kind = QueMessage::KIND_CLOSE;
pMsg->m_iThread_no = thread_no;
uv_mutex_lock(&g_mutex);
g_msgQueue.push( pMsg);
uv_mutex_unlock(&g_mutex);
uv_async_send(&g_async[ thread_no]);
fprintf(stderr, "from=%d.%x(send_count=%d) Downloaded END\n",
thread_no, uv_thread_self(), send_count);
}
void after0(uv_work_t *req, int status) {
int thread_no = *((int*) req->data);
closed++;
fprintf(stderr, "%2d: async_called=%3d, thread=%d.%x: after(), closed=%d\n",
++g_line_count, async_called, thread_no,
uv_thread_self(), closed);
}
int main() {
loop = uv_default_loop();
uv_work_t req[ WORKER_MAX];
uv_mutex_init(&g_mutex);
fprintf(stderr, "%2d: async_called=%3d, thread=%x: %s\n",
++g_line_count, async_called, uv_thread_self(), "main() start");
for ( int i = 0; i < WORKER_MAX; i++) {
g_thread_no[ i] = i;
uv_async_init(loop, &g_async[ i], print_progress);
req[ i].data = (void*) &g_thread_no[ i];
uv_queue_work(loop, req+i, fake_download0, after0);
}
uv_run(loop, UV_RUN_DEFAULT);
fprintf(stderr, "%2d: async_called=%3d, thread=%x: %s\n",
++g_line_count, async_called, uv_thread_self(), "main() end");
}
CFLAGS=-c -I/usr/local/src/libuv/include -D_GNU_SOURCE -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64 -pthread
all: ./worker
worker: main.o
g++ -o worker -L/usr/local/src/libuv main.o -luv -lpthread -lrt -lstdc++
main.o: main.c
g++ $(CFLAGS) main.c
#EOF
#!/usr/bin/bash
(uname -a
./worker 2>&1 )| tee log.txt
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment