Skip to content

Instantly share code, notes, and snippets.

@losophy
Last active June 1, 2021 08:11
Show Gist options
  • Save losophy/a8f2ed4325034cef37519ef98aa9ce3a to your computer and use it in GitHub Desktop.
Save losophy/a8f2ed4325034cef37519ef98aa9ce3a to your computer and use it in GitHub Desktop.
struct socket_server *
socket_server_create(uint64_t time) {
int i;
int fd[2];
poll_fd efd = sp_create();
if (sp_invalid(efd)) {
skynet_error(NULL, "socket-server: create event pool failed.");
return NULL;
}
if (pipe(fd)) {//建立管道
sp_release(efd);
skynet_error(NULL, "socket-server: create socket pair failed.");
return NULL;
}
if (sp_add(efd, fd[0], NULL)) {
// add recvctrl_fd to event poll
skynet_error(NULL, "socket-server: can't add server fd to event pool.");
close(fd[0]);
close(fd[1]);
sp_release(efd);
return NULL;
}
struct socket_server *ss = MALLOC(sizeof(*ss));
ss->time = time;
ss->event_fd = efd;
ss->recvctrl_fd = fd[0];//读取端给recvctrl_fd
ss->sendctrl_fd = fd[1];//写入端给sendctrl_fd
ss->checkctrl = 1;
for (i=0;i<MAX_SOCKET;i++) {
struct socket *s = &ss->slot[i];
s->type = SOCKET_TYPE_INVALID;
clear_wb_list(&s->high);
clear_wb_list(&s->low);
spinlock_init(&s->dw_lock);
}
ss->alloc_id = 0;
ss->event_n = 0;
ss->event_index = 0;
memset(&ss->soi, 0, sizeof(ss->soi));
FD_ZERO(&ss->rfds);
assert(ss->recvctrl_fd < FD_SETSIZE);
return ss;
}
static void *
thread_socket(void *p) {
struct monitor * m = p;
skynet_initthread(THREAD_SOCKET);
for (;;) {
int r = skynet_socket_poll();
if (r==0)
// 退出网络轮询,即网络线程退出
break;
if (r<0) {
CHECK_ABORT
// 一般r=-1,表示还有剩余网络事件需要处理
continue;
}
// r>0, 表示捕获到新的网络事件,若所有worker全部都处于sleep,则唤醒一个
wakeup(m,0);
}
return NULL;
}
int
skynet_socket_poll() {
struct socket_server *ss = SOCKET_SERVER;
assert(ss);
struct socket_message result;
int more = 1;
int type = socket_server_poll(ss, &result, &more);
switch (type) {
case SOCKET_EXIT:
return 0;
case SOCKET_DATA:
forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
break;
case SOCKET_CLOSE:
forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result);
break;
case SOCKET_OPEN:
forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
break;
case SOCKET_ERR:
forward_message(SKYNET_SOCKET_TYPE_ERROR, true, &result);
break;
case SOCKET_ACCEPT:
forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result);
break;
case SOCKET_UDP:
forward_message(SKYNET_SOCKET_TYPE_UDP, false, &result);
break;
case SOCKET_WARNING:
forward_message(SKYNET_SOCKET_TYPE_WARNING, false, &result);
break;
default:
skynet_error(NULL, "Unknown socket message type %d.",type);
return -1;
}
if (more) {
return -1;
}
return 1;
}
//该接口处理由操作系统提供的多路复用IO接口所捕获到的网络事件,包括内部网络命令和外部网络消息,然后根据其处理后的返回的网络消息类型,将返回结果转发给对应的服务(context)
// return type
int
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
for (;;) {
if (ss->checkctrl) {
if (has_cmd(ss)) {
int type = ctrl_cmd(ss, result);
if (type != -1) {
clear_closed_event(ss, result, type);
return type;
} else
continue;
} else {
ss->checkctrl = 0;
}
}
if (ss->event_index == ss->event_n) {
ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);
ss->checkctrl = 1;
if (more) {
*more = 0;
}
ss->event_index = 0;
if (ss->event_n <= 0) {
ss->event_n = 0;
if (errno == EINTR) {
continue;
}
return -1;
}
}
struct event *e = &ss->ev[ss->event_index++];
struct socket *s = e->s;
if (s == NULL) {
// dispatch pipe message at beginning
continue;
}
struct socket_lock l;
socket_lock_init(s, &l);
switch (s->type) {
case SOCKET_TYPE_CONNECTING:
return report_connect(ss, s, &l, result);
case SOCKET_TYPE_LISTEN: {
int ok = report_accept(ss, s, result);
if (ok > 0) {
return SOCKET_ACCEPT;
} if (ok < 0 ) {
return SOCKET_ERR;
}
// when ok == 0, retry
break;
}
case SOCKET_TYPE_INVALID:
skynet_error(NULL, "socket-server: invalid socket");
break;
default:
if (e->read) {
int type;
if (s->protocol == PROTOCOL_TCP) {
type = forward_message_tcp(ss, s, &l, result);
} else {
type = forward_message_udp(ss, s, &l, result);
if (type == SOCKET_UDP) {
// try read again
--ss->event_index;
return SOCKET_UDP;
}
}
if (e->write && type != SOCKET_CLOSE && type != SOCKET_ERR) {
// Try to dispatch write message next step if write flag set.
e->read = false;
--ss->event_index;
}
if (type == -1)
break;
return type;
}
if (e->write) {
int type = send_buffer(ss, s, &l, result);
if (type == -1)
break;
return type;
}
if (e->error) {
// close when error
int error;
socklen_t len = sizeof(error);
int code = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &error, &len);
const char * err = NULL;
if (code < 0) {
err = strerror(errno);
} else if (error != 0) {
err = strerror(error);
} else {
err = "Unknown error";
}
force_close(ss, s, &l, result);
result->data = (char *)err;
return SOCKET_ERR;
}
if(e->eof) {
force_close(ss, s, &l, result);
return SOCKET_CLOSE;
}
break;
}
}
}
struct socket_server {
volatile uint64_t time; // 等同于 TI->current
int recvctrl_fd; // 管道读端
int sendctrl_fd; // 管道写端
int checkctrl; // 是否需要检查内部命令的标识
poll_fd event_fd; // event poll
int alloc_id; // socket id分配器
int event_n; // 一次捕获到事件总数,该值小于等于 MAX_EVENT
int event_index; // 已处理过的事件索引
struct socket_object_interface soi; // userobjuect 接口
struct event ev[MAX_EVENT]; // 捕获的事件数组
struct socket slot[MAX_SOCKET]; // socket 数组
char buffer[MAX_INFO]; // 临时存放需要转发给其他服务的消息数据
uint8_t udpbuffer[MAX_UDP_PACKAGE]; // 接收的udp包
fd_set rfds; // select fd set
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment