Skip to content

Instantly share code, notes, and snippets.

@ms747
Last active October 24, 2023 11:05
Show Gist options
  • Save ms747/5259dc660460c0817da4ce22257de813 to your computer and use it in GitHub Desktop.
Save ms747/5259dc660460c0817da4ce22257de813 to your computer and use it in GitHub Desktop.
IO_Uring server
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <unistd.h>
#include <liburing.h>
#define PORT 8080
#define MAX_CONNECTIONS 4096
#define BACKLOG 512
#define MAX_MESSAGE_LEN 2048
#define BUFFERS_COUNT MAX_CONNECTIONS
#define HELLO_RESPONSE "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 14\r\n\r\nHello, World!\n"
void add_accept(struct io_uring *ring, int fd, struct sockaddr *client_addr, socklen_t *client_len, unsigned flags);
void add_socket_read(struct io_uring *ring, int fd, unsigned gid, size_t size, unsigned flags);
void add_socket_write(struct io_uring *ring, int fd, __u16 bid, size_t size, unsigned flags);
void add_provide_buf(struct io_uring *ring, __u16 bid, unsigned gid);
enum {
ACCEPT,
READ,
WRITE,
PROV_BUF,
};
typedef struct conn_info {
__u32 fd;
__u16 type;
__u16 bid;
} conn_info;
char bufs[BUFFERS_COUNT][MAX_MESSAGE_LEN] = {0};
int group_id = 1337;
int main(int argc, char *argv[]) {
struct sockaddr_in serv_addr, client_addr;
socklen_t client_len = sizeof(client_addr);
int sock_listen_fd = socket(AF_INET, SOCK_STREAM, 0);
const int val = 1;
setsockopt(sock_listen_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(PORT);
serv_addr.sin_addr.s_addr = INADDR_ANY;
if (bind(sock_listen_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
perror("Error binding socket...\n");
exit(1);
}
if (listen(sock_listen_fd, BACKLOG) < 0) {
perror("Error listening on socket...\n");
exit(1);
}
printf("io_uring echo server listening for connections on port: %d\n", PORT);
struct io_uring_params params;
struct io_uring ring;
memset(&params, 0, sizeof(params));
if (io_uring_queue_init_params(2048, &ring, &params) < 0) {
perror("io_uring_init_failed...\n");
exit(1);
}
if (!(params.features & IORING_FEAT_FAST_POLL)) {
exit(0);
}
struct io_uring_probe *probe;
probe = io_uring_get_probe_ring(&ring);
if (!probe || !io_uring_opcode_supported(probe, IORING_OP_PROVIDE_BUFFERS)) {
exit(0);
}
struct io_uring_sqe *sqe;
struct io_uring_cqe *cqe;
sqe = io_uring_get_sqe(&ring);
memcpy(bufs[0], HELLO_RESPONSE, strlen(HELLO_RESPONSE));
io_uring_prep_provide_buffers(sqe, bufs, MAX_MESSAGE_LEN, BUFFERS_COUNT, group_id, 0);
io_uring_submit(&ring);
io_uring_wait_cqe(&ring, &cqe);
if (cqe->res < 0) {
printf("cqe->res = %d\n", cqe->res);
exit(1);
}
io_uring_cqe_seen(&ring, cqe);
add_accept(&ring, sock_listen_fd, (struct sockaddr *)&client_addr, &client_len, 0);
while (1) {
io_uring_submit_and_wait(&ring, 1);
struct io_uring_cqe *cqe;
unsigned head;
unsigned count = 0;
io_uring_for_each_cqe(&ring, head, cqe) {
++count;
struct conn_info conn_i;
memcpy(&conn_i, &cqe->user_data, sizeof(conn_i));
int type = conn_i.type;
if (cqe->res == -ENOBUFS) {
exit(1);
} else if (type == PROV_BUF) {
if (cqe->res < 0) {
printf("cqe->res = %d\n", cqe->res);
exit(1);
}
} else if (type == ACCEPT) {
int sock_conn_fd = cqe->res;
if (sock_conn_fd >= 0) {
add_socket_read(&ring, sock_conn_fd, group_id, MAX_MESSAGE_LEN, IOSQE_BUFFER_SELECT);
}
add_accept(&ring, sock_listen_fd, (struct sockaddr *)&client_addr, &client_len, 0);
} else if (type == READ) {
int bytes_read = cqe->res;
int bid = cqe->flags >> 16;
if (cqe->res <= 0) {
add_provide_buf(&ring, bid, group_id);
close(conn_i.fd);
} else {
add_socket_write(&ring, conn_i.fd, bid, bytes_read, 0);
}
} else if (type == WRITE) {
add_provide_buf(&ring, conn_i.bid, group_id);
add_socket_read(&ring, conn_i.fd, group_id, MAX_MESSAGE_LEN, IOSQE_BUFFER_SELECT);
}
}
io_uring_cq_advance(&ring, count);
}
}
void add_accept(struct io_uring *ring, int fd, struct sockaddr *client_addr, socklen_t *client_len, unsigned flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_accept(sqe, fd, client_addr, client_len, 0);
io_uring_sqe_set_flags(sqe, flags);
conn_info conn_i = {
.fd = fd,
.type = ACCEPT,
};
memcpy(&sqe->user_data, &conn_i, sizeof(conn_i));
}
void add_socket_read(struct io_uring *ring, int fd, unsigned gid, size_t message_size, unsigned flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_recv(sqe, fd, NULL, message_size, 0);
io_uring_sqe_set_flags(sqe, flags);
sqe->buf_group = gid;
conn_info conn_i = {
.fd = fd,
.type = READ,
};
memcpy(&sqe->user_data, &conn_i, sizeof(conn_i));
}
void add_socket_write(struct io_uring *ring, int fd, __u16 bid, size_t message_size, unsigned flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_send(sqe, fd, &bufs[0], sizeof(HELLO_RESPONSE), 0);
io_uring_sqe_set_flags(sqe, flags);
conn_info conn_i = {
.fd = fd,
.type = WRITE,
.bid = bid,
};
memcpy(&sqe->user_data, &conn_i, sizeof(conn_i));
}
void add_provide_buf(struct io_uring *ring, __u16 bid, unsigned gid) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_provide_buffers(sqe, bufs[bid], MAX_MESSAGE_LEN, 1, gid, bid);
conn_info conn_i = {
.fd = 0,
.type = PROV_BUF,
};
memcpy(&sqe->user_data, &conn_i, sizeof(conn_i));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment