Last active
June 1, 2021 08:11
-
-
Save losophy/a8f2ed4325034cef37519ef98aa9ce3a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
struct socket_server * | |
socket_server_create(uint64_t time) { | |
int i; | |
int fd[2]; | |
poll_fd efd = sp_create(); | |
if (sp_invalid(efd)) { | |
skynet_error(NULL, "socket-server: create event pool failed."); | |
return NULL; | |
} | |
if (pipe(fd)) {//建立管道 | |
sp_release(efd); | |
skynet_error(NULL, "socket-server: create socket pair failed."); | |
return NULL; | |
} | |
if (sp_add(efd, fd[0], NULL)) { | |
// add recvctrl_fd to event poll | |
skynet_error(NULL, "socket-server: can't add server fd to event pool."); | |
close(fd[0]); | |
close(fd[1]); | |
sp_release(efd); | |
return NULL; | |
} | |
struct socket_server *ss = MALLOC(sizeof(*ss)); | |
ss->time = time; | |
ss->event_fd = efd; | |
ss->recvctrl_fd = fd[0];//读取端给recvctrl_fd | |
ss->sendctrl_fd = fd[1];//写入端给sendctrl_fd | |
ss->checkctrl = 1; | |
for (i=0;i<MAX_SOCKET;i++) { | |
struct socket *s = &ss->slot[i]; | |
s->type = SOCKET_TYPE_INVALID; | |
clear_wb_list(&s->high); | |
clear_wb_list(&s->low); | |
spinlock_init(&s->dw_lock); | |
} | |
ss->alloc_id = 0; | |
ss->event_n = 0; | |
ss->event_index = 0; | |
memset(&ss->soi, 0, sizeof(ss->soi)); | |
FD_ZERO(&ss->rfds); | |
assert(ss->recvctrl_fd < FD_SETSIZE); | |
return ss; | |
} | |
static void * | |
thread_socket(void *p) { | |
struct monitor * m = p; | |
skynet_initthread(THREAD_SOCKET); | |
for (;;) { | |
int r = skynet_socket_poll(); | |
if (r==0) | |
// 退出网络轮询,即网络线程退出 | |
break; | |
if (r<0) { | |
CHECK_ABORT | |
// 一般r=-1,表示还有剩余网络事件需要处理 | |
continue; | |
} | |
// r>0, 表示捕获到新的网络事件,若所有worker全部都处于sleep,则唤醒一个 | |
wakeup(m,0); | |
} | |
return NULL; | |
} | |
int | |
skynet_socket_poll() { | |
struct socket_server *ss = SOCKET_SERVER; | |
assert(ss); | |
struct socket_message result; | |
int more = 1; | |
int type = socket_server_poll(ss, &result, &more); | |
switch (type) { | |
case SOCKET_EXIT: | |
return 0; | |
case SOCKET_DATA: | |
forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result); | |
break; | |
case SOCKET_CLOSE: | |
forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result); | |
break; | |
case SOCKET_OPEN: | |
forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result); | |
break; | |
case SOCKET_ERR: | |
forward_message(SKYNET_SOCKET_TYPE_ERROR, true, &result); | |
break; | |
case SOCKET_ACCEPT: | |
forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result); | |
break; | |
case SOCKET_UDP: | |
forward_message(SKYNET_SOCKET_TYPE_UDP, false, &result); | |
break; | |
case SOCKET_WARNING: | |
forward_message(SKYNET_SOCKET_TYPE_WARNING, false, &result); | |
break; | |
default: | |
skynet_error(NULL, "Unknown socket message type %d.",type); | |
return -1; | |
} | |
if (more) { | |
return -1; | |
} | |
return 1; | |
} | |
//该接口处理由操作系统提供的多路复用IO接口所捕获到的网络事件,包括内部网络命令和外部网络消息,然后根据其处理后的返回的网络消息类型,将返回结果转发给对应的服务(context) | |
// return type | |
int | |
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) { | |
for (;;) { | |
if (ss->checkctrl) { | |
if (has_cmd(ss)) { | |
int type = ctrl_cmd(ss, result); | |
if (type != -1) { | |
clear_closed_event(ss, result, type); | |
return type; | |
} else | |
continue; | |
} else { | |
ss->checkctrl = 0; | |
} | |
} | |
if (ss->event_index == ss->event_n) { | |
ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT); | |
ss->checkctrl = 1; | |
if (more) { | |
*more = 0; | |
} | |
ss->event_index = 0; | |
if (ss->event_n <= 0) { | |
ss->event_n = 0; | |
if (errno == EINTR) { | |
continue; | |
} | |
return -1; | |
} | |
} | |
struct event *e = &ss->ev[ss->event_index++]; | |
struct socket *s = e->s; | |
if (s == NULL) { | |
// dispatch pipe message at beginning | |
continue; | |
} | |
struct socket_lock l; | |
socket_lock_init(s, &l); | |
switch (s->type) { | |
case SOCKET_TYPE_CONNECTING: | |
return report_connect(ss, s, &l, result); | |
case SOCKET_TYPE_LISTEN: { | |
int ok = report_accept(ss, s, result); | |
if (ok > 0) { | |
return SOCKET_ACCEPT; | |
} if (ok < 0 ) { | |
return SOCKET_ERR; | |
} | |
// when ok == 0, retry | |
break; | |
} | |
case SOCKET_TYPE_INVALID: | |
skynet_error(NULL, "socket-server: invalid socket"); | |
break; | |
default: | |
if (e->read) { | |
int type; | |
if (s->protocol == PROTOCOL_TCP) { | |
type = forward_message_tcp(ss, s, &l, result); | |
} else { | |
type = forward_message_udp(ss, s, &l, result); | |
if (type == SOCKET_UDP) { | |
// try read again | |
--ss->event_index; | |
return SOCKET_UDP; | |
} | |
} | |
if (e->write && type != SOCKET_CLOSE && type != SOCKET_ERR) { | |
// Try to dispatch write message next step if write flag set. | |
e->read = false; | |
--ss->event_index; | |
} | |
if (type == -1) | |
break; | |
return type; | |
} | |
if (e->write) { | |
int type = send_buffer(ss, s, &l, result); | |
if (type == -1) | |
break; | |
return type; | |
} | |
if (e->error) { | |
// close when error | |
int error; | |
socklen_t len = sizeof(error); | |
int code = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &error, &len); | |
const char * err = NULL; | |
if (code < 0) { | |
err = strerror(errno); | |
} else if (error != 0) { | |
err = strerror(error); | |
} else { | |
err = "Unknown error"; | |
} | |
force_close(ss, s, &l, result); | |
result->data = (char *)err; | |
return SOCKET_ERR; | |
} | |
if(e->eof) { | |
force_close(ss, s, &l, result); | |
return SOCKET_CLOSE; | |
} | |
break; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
struct socket_server { | |
volatile uint64_t time; // 等同于 TI->current | |
int recvctrl_fd; // 管道读端 | |
int sendctrl_fd; // 管道写端 | |
int checkctrl; // 是否需要检查内部命令的标识 | |
poll_fd event_fd; // event poll | |
int alloc_id; // socket id分配器 | |
int event_n; // 一次捕获到事件总数,该值小于等于 MAX_EVENT | |
int event_index; // 已处理过的事件索引 | |
struct socket_object_interface soi; // userobjuect 接口 | |
struct event ev[MAX_EVENT]; // 捕获的事件数组 | |
struct socket slot[MAX_SOCKET]; // socket 数组 | |
char buffer[MAX_INFO]; // 临时存放需要转发给其他服务的消息数据 | |
uint8_t udpbuffer[MAX_UDP_PACKAGE]; // 接收的udp包 | |
fd_set rfds; // select fd set | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment