Skip to content

Instantly share code, notes, and snippets.

@dilfish
Last active March 5, 2016 02:48
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save dilfish/44a0a962110b8b1a1dc9 to your computer and use it in GitHub Desktop.
Save dilfish/44a0a962110b8b1a1dc9 to your computer and use it in GitHub Desktop.
last edit: 20140822 15:11
本文分析的代码是 nanomsg-0.4-beta版本。
要创建一个 PUB/SUB 服务,只需要六个 API,分别是:
1. nn_socket
2. nn_bind
3. nn_connect
4. nn_send
5. nn_recv
6. nn_close
1. nn_socket
1.1 nn_global_init
1.2 nn_global_create_socket
1.1.1 nn_global_add_transport (nn_tcp);
1.1.2 nn_global_add_socktype (nn_pub_socktype);
1.1.3 nn_global_add_socktype (nn_sub_socktype);
1.1.4 nn_pool_init (&self.pool);
1.1.5 nn_fsm_init_root (&self.fsm, nn_global_handler, nn_global_shutdown, &self.ctx);
1.1.6 nn_ctx_init (&self.ctx, nn_global_getpool (), NULL);
1.1.7 nn_fsm_start (&self.fsm);
nn_global 中的 transport 和 socktype 分别代表了底层实现方式和用户的使用方式。
这里的 transport 和 socktype 都是 list, nn_global_add_transport 函数可以添加随意多的 transport 和 socktype。
在创建 socket 的时候,遍历查找。
此刻我们只关心 nn_tcp 和 nn_pub_socktype, nn_sub_socktype。
定义如下:
static struct nn_socktype nn_sub_socktype_struct = {
AF_SP, NN_SUB,
NN_SOCKTYPE_FLAG_NOSEND,
nn_xsub_create, nn_xsub_ispeer,
NN_LIST_ITEM_INITIALIZER
};
static struct nn_socktype nn_pub_socktype_struct = {
AF_SP, NN_PUB,
NN_SOCKTYPE_FLAG_NORECV,
nn_xpub_create, nn_xpub_ispeer,
NN_LIST_ITEM_INITIALIZER
};
static struct nn_transport nn_tcp_vfptr = {
"tcp", NN_TCP, NULL, NULL,
nn_tcp_bind, nn_tcp_connect,
nn_tcp_optset,
NN_LIST_ITEM_INITIALIZER
};
struct nn_transport *nn_tcp = &nn_tcp_vfptr;
1.1.4 nn_pool_init
本部分尚未完全完成,每个 pool 中只有一个 worker。
nn_pool_init == nn_worker_init
1.1.4.1 nn_worker_init
int nn_worker_init (struct nn_worker *self)
{
此处 efd 用于进程间的通信方式,底层使用 socketfd 实现。
1.1.4.1.1 nn_poller_init (&self->poller);
1.1.4.1.2 nn_poller_add (&self->poller, nn_efd_getfd (&self->efd), &self->efd_hndl);
1.1.4.1.3 nn_thread_init (&self->thread, nn_worker_routine, self);
}
1.1.4.1.1 nn_poller_init
poller 底层用 epoll 实现,是一个标准的 poller。
1.1.4.1.2 nn_poller_add
此处将前面的 efd 加入到 worker 的 poller,此时 &self->efd_hndl == NULL
1.1.4.1.3 nn_thread_init
此处启动新线程,将 self 传入,进入事件循环。
可以预见,user 线程将事件加入到 worker 线程中后,通过 efd 通知 worker 线程有新事件。
1.1.5 nn_fsm_init_root
此函数将 self 加入到 fsm 中去。处理函数为 nn_global_handler 和 nn_global_shutdown。
其 ctx 即 context 上下文是 self.ctx。
fsm 是类似于 c++ 的一种用法,将数据和处理的函数绑定在一起,处理函数以状态机实现。
fsm 有 owner 的概念,即上层 fsm。一个事件本层的 fsm 处理完以后,想传给下层的 fsm 只需要直接调用即可。
想传给上层的 fsm 则需要调用 nn_fsm_raise 这个函数,另外,nn_fsm_raiseto 可以将事件传递给没关系的某个 fsm。
init_root 则表明此 fsm 没有上层。
1.1.6 nn_ctx_init
ctx 即上下文,每个 ctx 包含一个 pool,每个 pool 中可以包含多个 worker(未完成,当前每个 pool 中一个 worker)。
即每个 ctx 当前有一个 worker 线程。
1.1.7 nn_fsm_start
fsm 状态变化,调用 nn_global_handler 处理。
1.2 nn_global_create_socket
查找合适的 domain 和 protocol,占用一个 self.sock 的位置。然后调用
1.2.1 nn_sock_init
int nn_sock_init (struct nn_sock *self, struct nn_socktype *socktype, int fd)
{
1.2.1.1 nn_ctx_init (&self->ctx, nn_global_getpool (), nn_sock_onleave);
1.2.1.2 nn_fsm_init_root (&self->fsm, nn_sock_handler, nn_sock_shutdown, &self->ctx);
self->state = NN_SOCK_STATE_INIT;
本函数中初始化了两个 efd,分别是 send 和 recv,如果不需要,则关闭。
1.2.1.3 rc = socktype->create ((void*) self, &self->sockbase);
self->socktype = socktype;
1.2.1.4 nn_fsm_start (&self->fsm);
}
1.2.1.1 nn_ctx_init
可以看出每个 sock 有自己独立的 ctx,但是共享一个全局的 pool,即共享一个全局的 worker 线程。
1.2.1.2 nn_fsm_init_root
每个 sock 的 fsm 是独立的,没有上层。使用自己的 ctx,每个 sock 是一个相对独立的单位,不同
的 sock 之间肯定是可以多线程处理的。
1.2.1.3 socktype->create == nn_xpub_create
此函数将 sock 与本身的 sockbase 处理函数连接起来。
static const struct nn_sockbase_vfptr nn_xpub_sockbase_vfptr = {
NULL, nn_xpub_destroy, nn_xpub_add, nn_xpub_rm,
nn_xpub_in, nn_xpub_out, nn_xpub_events, nn_xpub_send,
NULL, nn_xpub_setopt, nn_xpub_getop, };
每个 sock 有自己的 socktype,负责上层调用,比如 create 和 ispeer,以及一些设定。
而本身的 sockbase 则是为了处理数据的,比如 add 一个链接等。
1.2.1.4 nn_fsm_start
此函数将本 sock 的状态从 INIT 改为 ACTIVE,表示本 sock 可以使用了。
到此初始化工作完毕,可以看出 nanomsg 使用了层级式的结构,将概念抽象出来。
添加 transport 和 socktype 十分方便,跟 linux 内核的处理方式颇为相像。
事件的传递也是层级式的,比如一个 sendable 事件,首先将 socket 加入到最底层的 poller 中,当 socket 变
为 sendable 之后,通过 fsm 通知上层,并改变状态到 sendable,上层收到事件以后,改状态,上传,直至用户态,
用户调用 send。
send 事件同理,用户 send 数据,状态机层层改变,直至最底层,改变状态,发送数据,再次加入 poller。
目前每个 ctx 一个 pool,每个 pool 一个 worker。以后如果 worker 可调节,则用户态处理逻辑,worker 处理链接。
如果是网络型的业务,worker 间的协调应该和 nginx 类似,轮流 accept。
如果是计算型的业务,用户多个线程之间类似,可以轮流 recv/send。
2. nn_bind
nn_bind 是服务器端调用的函数,比如 PUB。
2.1 nn_global_create_ep
查找合适的 transport 名字,调用 nn_sock_add_ep
2.1.1 nn_sock_add_ep (self.socks [s], tp, bind, addr);
int nn_sock_add_ep (struct nn_sock *self, struct nn_transport *transport,
int bind, const char *addr)
{
2.1.1.1 rc = nn_ep_init (ep, NN_SOCK_SRC_EP, self, self->eid, transport,
bind, addr);
2.1.1.2 nn_ep_start (ep);
此处将 ep 加入到 sock 里面的 eps 中, eid 从 1 开始递增。
}
2.1.1.1 nn_ep_init
nanomsg 把每一个端点叫做 endpoints,类似于 IP:Port 这样。
类似 socktype, ep 也有 epbase,负责 ep 的销毁。
每个 ep 有自己独立的 fsm,此时,全局 global 有自己的 fsm,每个 sock 有自己的 fsm。
每个 sock 中的每个 ep 有自己的 fsm
int nn_ep_init (struct nn_ep *self, int src, struct nn_sock *sock, int eid,
struct nn_transport *transport, int bind, const char *addr)
{
此处将 fsm 置于 IDLE
if (bind)
2.1.1.1.1 rc = transport->bind ((void*) self, &self->epbase);
else
2.1.1.1.2 rc = transport->connect ((void*) self, &self->epbase);
}
我们这里使用 TCP,则此处的两个函数分别为 nn_btcp_create 和 nn_ctcp_create
2.1.1.1.1 nn_btcp_create
nn_btcp 表示 bind tcp,其结构里包含 atcp,表示 accept tcp
int nn_btcp_create (void *hint, struct nn_epbase **epbase)
{
2.1.1.1.1.1 nn_epbase_init (&self->epbase, &nn_btcp_epbase_vfptr, hint);
2.1.1.1.1.2 nn_fsm_init_root (&self->fsm, nn_btcp_handler, nn_btcp_shutdown,
nn_epbase_getctx (&self->epbase));
2.1.1.1.1.3 nn_usock_init (&self->usock, NN_BTCP_SRC_USOCK, &self->fsm);
2.1.1.1.1.4 nn_fsm_start (&self->fsm);
}
nn_epbase_init
将 btcp_epbase 和本 ep 链接起来。
nn_fsm_init_root
btcp 的 fsm 没有上层。
nn_usock_init
usock 有自己的 fsm,其上层的 fsm 是 btcp 的 fsm
usock 是最底层实现功能的组件,这个结构体里包括了 in 和 out 两个结构体,表示要发出的数据和接收缓存的数据。
包含了一堆的 task,和 worker thread 通信,将任务打包成 nn_worker_task 结构体。
包含了一堆的 event,是本身发起的一系列事件。
nn_fsm_start
btcp 的 fsm start 里执行了两个函数, nn_btcp_start_listening 和 nn_btcp_start_accepting。
然后将状态改为 ACTIVE。表示 btcp 已经可以用了。
nn_btcp_start_listening 里调用了
nn_usock_start, 执行 OS 层的 socket,状态 STARTING
nn_usock_bind, 执行 OS 层的 bind
nn_usock_listen,执行 OS 层的 listen,状态 LISTENING
nn_btcp_start_accepting 里调用了
nn_atcp_init
nn_atcp_start
进入 atcp 的逻辑
tmp.1 nn_atcp_init
btcp.fsm 是 atcp.fsm 的上层
atcp 有自己的 usock,即 atcp 的 usock 为 fd = accept(ls) 中的 fd,而 btcp 的 usock 为其中的 ls。
atcp 又初始化了 stcp,stcp.fsm 的上层为 atcp.fsm。
stcp 没有自己对应的 usock,包含了一个 pipebase,和前类似,pipe 表示一个链接。
stcp 表示 stream tcp,其数据通信前包含一个 streamhdr。
streamhdr 的 fsm 属于 stcp,fsm 真他妈多啊。
pipebase 有自己的 fsm,但他的 fsm 上层是对应的 sock。
tmp.2 nn_atcp_start
swap_owner 之后,atcp 拥有了 btcp 的 usock 的 ownership。
同时还拥有自己的 usock。
自己的 usock 的状态变为 BEING_ACCEPTED
listener 的 usock 状态为 ACCEPT
调用 nn_usock_accept,完成之后,状态从 IDLE 到 ACCEPTING
此时进行 OS 的 accept,如果成功,则 listener 和 atcp 同时到 DONE 状态。
如果失败,则将两者包装成 nn_worker_task,加进 worker 队列,并 signal efd
worker 通过 nn_fsm_feed (task->owner, task->src, NN_WORKER_TASK_EXECUTE, task);
又会调用 atcp 的 accept 进行 accept,死循环。
2.1.1.2 nn_ep_start
修改本 ep 状态为 ACTIVE
nn_bind 将下层的 TCP 链接准备好至 listen 状态,并加入到 worker 的 poller 中去。
可以看到 sock 之下是 ep,ep 之下是 transport,transport 下面是 usock。因为 TCP 复杂,分了
三个 transport:atcp, btcp, stcp。
同时可以看到众多层 fsm 的好处,那就是,将任何一层单元包装成一个含有 fsm 的结构,在 thread 之间
传递,任何线程都可以操作,而且其状态总是一致的,真正做到了异步。
3. nn_connect
4. nn_send
4.1 nn_sendmsg
4.1.1 nn_sock_send
4.1.1.1 self->sockbase->vfptr->send (self->sockbase, msg);
此处的 send 为 nn_xpub_send, nn_xpub_send 又调用 nn_dist_send 将数据最终通过合适的 pipe 发送出去。
本例中为:
const struct nn_pipebase_vfptr nn_stcp_pipebase_vfptr = {
nn_stcp_send,
nn_stcp_recv
};
如果 self->sockbase->vfptr->send 失败,则没有合适的 send pipe。
此时进入 nn_sock->sndfd 等待,返回之后继续 send
可见,nanomsg 整个逻辑是, user 线程只处理同步的操作,异步的操作交给 worker thread,中间通过 efd 来同
步,通过 task 传递事件数据。
5. nn_recv
6. nn_close
@dilfish
Copy link
Author

dilfish commented Sep 1, 2014

nanomsg 中的 trie 树对传统的 trie 树进行了一个修改。

比如在传统的 trie 树中,abc 这个串会形成一个三个节点的树,分别在层一,层二,层三。
nanomsg 中的 trie 树将 prefix 放在一个字符串里面。这样以来,abc 就只有一个节点。如果下次插入 abd,那么:

第一个节点将会分割,ab 成为父节点,子节点为 c 和 d,共三个节点。

其实是因为在 trie 树中,指针所占空间太大做的一个改进,这里的 prefix 有个最大长度,如果超过了最大长度,依然会放在
子节点中,nanomsg 使用的数字是10,这应该是一个经验值,在32位机器和64位机器上应该用不一样的值。

here's another implementation of trie tree:
http://linux.thai.net/~thep/datrie/datrie.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment