Skip to content

Instantly share code, notes, and snippets.

@oktal
Created October 24, 2019 12:05
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oktal/64901677ae3820e7b224bf9e196c63d1 to your computer and use it in GitHub Desktop.
Save oktal/64901677ae3820e7b224bf9e196c63d1 to your computer and use it in GitHub Desktop.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <getopt.h>
#include <unistd.h>
#include <time.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#if defined(WITH_URING)
#include "liburing.h"
#endif
#define MAX_CLIENTS 64
typedef struct program_options
{
int port;
int clients_count;
int buffer_size;
int no_delay;
int cork;
int total_messages;
int batch_size;
int message_rate;
int sleep_ms;
#if defined(WITH_URING)
int enable_uring;
int setup_sqpoll;
int sq_thread_cpu;
#endif
} program_options;
typedef struct client_data
{
int sockfd;
int regfd;
struct sockaddr_in addr;
struct iovec iov;
struct msghdr msg;
int is_corked;
} client_data;
typedef struct server
{
int sockfd;
size_t n_clients;
size_t clients_count;
client_data *clients[MAX_CLIENTS];
#if defined(WITH_URING)
int enable_uring;
struct io_uring ring;
#endif
} server;
typedef struct summary
{
long *latency_ns;
size_t n_latency;
size_t index;
} summary;
typedef struct stats
{
summary* send_latency;
summary* batch_latency;
} stats;
#define BEGIN_TIMED(var) \
do { \
struct timespec _timed_time_begin, _timed_time_end; \
summary* _timed_summary = var; \
clock_gettime(CLOCK_MONOTONIC, &_timed_time_begin); \
#define END_TIMED() \
clock_gettime(CLOCK_MONOTONIC, &_timed_time_end); \
struct timespec _timed_elapsed = time_diff(_timed_time_begin, _timed_time_end); \
summary_push(_timed_summary, _timed_elapsed.tv_nsec); \
} while (0);
int set_blocking(int sockfd, int value)
{
int flags = fcntl(sockfd, F_GETFL, 0);
if (flags == -1)
return flags;
flags = value ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);
return fcntl(sockfd, F_SETFL, flags);
}
int set_no_delay(int sockfd, int value)
{
return setsockopt(sockfd, SOL_TCP, TCP_NODELAY, (void *) &value, sizeof(int));
}
int set_cork(int sockfd, int value)
{
return setsockopt(sockfd, SOL_TCP, TCP_CORK, &value, sizeof(value));
}
typedef enum options_status
{
options_error = -1,
options_ok = 0,
} options_status;
options_status validate_options(const program_options* options)
{
if (options->port < 1024 || options->port > 65536)
{
fprintf(stderr, "You must specify a valid port (--port). Value is %d\n", options->port);
return options_error;
}
if (options->clients_count < 0)
{
fprintf(stderr, "You must specify a number of clients to wait before starting sending packets (--clients-count). Value is %d\n",
options->clients_count);
return options_error;
}
if (options->buffer_size < 0)
{
fprintf(stderr, "You must specify a valid buffer size (--buffer-size). Value is %d\n", options->buffer_size);
return options_error;
}
if (options->total_messages < 0)
{
fprintf(stderr, "You must specify a total number of messages to send (--total-messages). Value is %d\n", options->total_messages);
return options_error;
}
if (options->batch_size > 0 && options->message_rate > 0)
{
fprintf(stderr, "You must specify either batch_size or message_rate, not both\n");
return options_error;
}
return options_ok;
}
int parse_program_socket_options(const char* val, program_options* options)
{
char c;
while ((c = *val++) != '\0')
{
switch (c)
{
case 'n':
options->no_delay = 1;
break;
case 'c':
options->cork = 1;
break;
}
}
return 0;
}
program_options parse_options(int argc, char* argv[])
{
program_options options;
options.port = -1;
options.clients_count = -1;
options.buffer_size = -1;
options.no_delay = 0;
options.cork = 0;
options.message_rate = -1;
options.total_messages = -1;
options.batch_size = -1;
options.sleep_ms = -1;
#if defined(WITH_URING)
options.enable_uring = 0;
options.setup_sqpoll = 0;
options.sq_thread_cpu = -1;
#endif
int c = 0;
struct option long_options[] = {
{ "sockopt", required_argument, 0, 'o' },
{ "port", required_argument, 0, 'p' },
{ "clients-count", required_argument, 0, 'c' },
{ "buffer-size", required_argument, 0, 'b' },
{ "total-messages", required_argument, 0, 't' },
{ "batch-size", required_argument, 0, 'd' },
{ "sleep-ms", required_argument, 0, 'm' },
{ "message-rate", required_argument, 0, 'r' },
#if defined(WITH_URING)
{ "uring", no_argument, 0, 'u' },
{ "sq-poll", no_argument, 0, 's' },
{ "sq-thread-affinity", required_argument, 0, 'a' },
#endif
{ 0, 0, 0, 0}
};
#if defined(WITH_URING)
const char* const short_options = "npb:c:d:m:t:";
#else
const char* const short_options = "npusa:b:c:d:m:t:";
#endif
while ((c = getopt_long(argc, argv, short_options, long_options, 0)) != -1)
{
switch (c)
{
case 'o':
parse_program_socket_options(optarg, &options);
break;
case 'p':
options.port = atoi(optarg);
break;
case 'b':
options.buffer_size = atoi(optarg);
break;
case 'c':
options.clients_count = atoi(optarg);
break;
case 'd':
options.batch_size = atoi(optarg);
break;
case 'm':
options.sleep_ms = atoi(optarg);
break;
case 'r':
options.message_rate = atoi(optarg);
break;
case 't':
options.total_messages = atoi(optarg);
break;
#if defined(WITH_URING)
case 'u':
options.enable_uring = 1;
break;
case 's':
options.setup_sqpoll = 1;
break;
case 'a':
options.sq_thread_cpu = atoi(optarg);
break;
#endif
}
}
return options;
}
void options_print(const program_options* options)
{
puts("TCP Options");
puts("---------------");
printf("Port: %d\n", options->port);
printf("Clients count: %d\n", options->clients_count);
printf("No_delay: %d\n", options->no_delay);
printf("Cork: %d\n", options->cork);
putchar('\n');
puts("Message Options");
puts("---------------");
printf("Buffer size: %d bytes\n", options->buffer_size);
printf("Batch size: %d\n", options->batch_size);
printf("Message rate: %d QPS\n", options->message_rate);
printf("Total messages: %d\n", options->total_messages);
printf("Sleep time: %dms\n", options->sleep_ms);
putchar('\n');
#if defined(WITH_URING)
putchar('\n');
puts("io_uring Options");
puts("----------------");
printf("Enable uring: %d\n", options->enable_uring);
printf("Setup sqpoll: %d\n", options->setup_sqpoll);
printf("sq_thread_cpu: %d\n", options->sq_thread_cpu);
putchar('\n');
#endif
}
#if defined(WITH_URING)
struct io_uring_params get_uring_params(const program_options* options)
{
struct io_uring_params p;
memset(&p, 0, sizeof(p));
if (options->setup_sqpoll)
{
p.flags |= IORING_SETUP_SQPOLL;
}
if (options->sq_thread_cpu != -1)
{
p.flags |= IORING_SETUP_SQ_AFF;
p.sq_thread_cpu = options->sq_thread_cpu;
}
return p;
}
#endif
#if defined(WITH_URING)
int init_uring(const program_options* options, unsigned entries, struct io_uring* ring)
{
struct io_uring_params p = get_uring_params(options);
int fd, ret;
fd = io_uring_setup(entries, &p);
if (fd < 0)
return -errno;
ret = io_uring_queue_mmap(fd, &p, ring);
if (ret)
close(fd);
return ret;
}
#endif
#if defined(WITH_URING)
int register_uring_files(server* srv)
{
if (!srv->enable_uring)
return 0;
int* fds = malloc(srv->n_clients * sizeof(*fds));
if (!fds)
return -1;
for (size_t i = 0; i < srv->n_clients; ++i)
{
client_data* client = srv->clients[i];
fds[i] = client->sockfd;
client->regfd = i;
}
int ret = io_uring_register_files(&srv->ring, fds, srv->n_clients);
if (ret < 0)
perror("io_uring_register_files");
return ret;
}
#endif
server *server_new(const program_options* options)
{
server* srv = calloc(1, sizeof *srv);
if (!srv)
return NULL;
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
perror("socket");
return NULL;
}
int optval = 1;
int ret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval);
if (ret < 0)
{
perror("setsockopt");
return NULL;
}
#if defined(WITH_URING)
if (options->enable_uring)
{
int ret = init_uring(options, 1024, &srv->ring);
if (ret < 0)
{
perror("init_uring");
return NULL;
}
srv->enable_uring = 1;
}
#endif
srv->sockfd = sockfd;
srv->clients_count = options->clients_count;
return srv;
}
int server_bind(server* srv, uint16_t port)
{
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(port);
return bind(srv->sockfd, (struct sockaddr *)&addr, sizeof(addr));
}
int server_listen(server* srv, int backlog)
{
return listen(srv->sockfd, backlog);
}
server* server_start(const program_options* options)
{
enum { Backlog = 128 };
server* srv = server_new(options);
if (!srv)
goto fail;
int ret = server_bind(srv, options->port);
if (ret < 0)
goto fail;
ret = server_listen(srv, Backlog);
if (ret < 0)
goto fail;
return srv;
fail:
perror("server_start");
free(srv);
return NULL;
}
void server_stop(server* srv)
{
close(srv->sockfd);
#if defined(WITH_URING)
if (srv->enable_uring)
io_uring_queue_exit(&srv->ring);
#endif
}
client_data* client_new(struct sockaddr_in addr, int sockfd)
{
client_data* client = calloc(1, sizeof *client);
if (!client)
return NULL;
client->addr = addr;
client->sockfd = sockfd;
return client;
}
summary* summary_new(size_t total_messages)
{
summary* summary = malloc(sizeof *summary);
if (!summary)
return NULL;
long* latency_ns = malloc(total_messages * sizeof(long));
if (latency_ns == NULL)
return NULL;
summary->latency_ns = latency_ns;
summary->n_latency = total_messages;
summary->index = 0;
return summary;
}
void summary_dump(const summary* summary)
{
for (size_t i = 0; i < summary->n_latency; ++i)
{
printf(" [%ld] = %ld ", i, summary->latency_ns[i]);
}
printf("\n");
}
void summary_push(summary* summary, long latency_ns)
{
summary->latency_ns[summary->index] = latency_ns;
summary->index = (summary->index + 1) % summary->n_latency;
}
struct timespec time_diff(struct timespec start, struct timespec end)
{
struct timespec diff;
if ((end.tv_nsec - start.tv_nsec) < 0)
{
diff.tv_sec = end.tv_sec - start.tv_sec - 1;
diff.tv_nsec = 1000000000 + end.tv_nsec - start.tv_nsec;
}
else
{
diff.tv_sec = end.tv_sec - start.tv_sec;
diff.tv_nsec = end.tv_nsec - start.tv_nsec;
}
return diff;
}
void do_nanosleep(int sleep_ns)
{
struct timespec time_sleep;
time_sleep.tv_sec = 0;
time_sleep.tv_nsec = sleep_ns;
nanosleep(&time_sleep, NULL);
}
static int latency_less(const void* l1, const void *l2)
{
int v1 = *(long *)l1;
int v2 = *(long *)l2;
return v1 - v2;
}
int pct_nth_idx(double percentile, size_t size)
{
return (int)(percentile * size);
}
long get_pct(long* arr, double percentile, size_t size)
{
return arr[pct_nth_idx(percentile, size)];
}
void report_summary(summary* summary)
{
qsort(summary->latency_ns, summary->n_latency, sizeof(long), latency_less);
unsigned long long total_latency = 0;
for (size_t i = 0; i < summary->n_latency; ++i)
{
total_latency += summary->latency_ns[i];
}
long mean_latency_ns = total_latency / summary->n_latency;
double percentiles[] = { 0.10, 0.20, 0.50, 0.80, 0.90, 0.99 };
printf("Min: %ldns\n", summary->latency_ns[0]);
printf("Mean: %ldns\n", mean_latency_ns);
printf("Max: %ldns\n", summary->latency_ns[summary->n_latency - 1]);
for (size_t i = 0; i < sizeof(percentiles) / sizeof(*percentiles); ++i)
{
printf("p(%lf) = %ldns\n", percentiles[i], get_pct(summary->latency_ns, percentiles[i], summary->n_latency));
}
}
void print_tcp_info(server* srv)
{
for (size_t i = 0; i < srv->n_clients; ++i)
{
struct tcp_info tcp_info;
socklen_t tcp_info_length = sizeof(tcp_info);
if (getsockopt(srv->clients[i]->sockfd, SOL_TCP, TCP_INFO, (void *)&tcp_info, &tcp_info_length) == 0)
{
printf("fd %d: %u %u %u %u %u %u %u %u %u %u %u %u\n",
srv->clients[i]->sockfd,
tcp_info.tcpi_last_data_sent,
tcp_info.tcpi_last_data_recv,
tcp_info.tcpi_snd_cwnd,
tcp_info.tcpi_snd_ssthresh,
tcp_info.tcpi_rcv_ssthresh,
tcp_info.tcpi_rtt,
tcp_info.tcpi_rttvar,
tcp_info.tcpi_unacked,
tcp_info.tcpi_sacked,
tcp_info.tcpi_lost,
tcp_info.tcpi_retrans,
tcp_info.tcpi_fackets);
}
}
}
int fill_timestamp(client_data* client)
{
struct timespec tp;
int res = clock_gettime(CLOCK_REALTIME, &tp);
if (res == 0)
{
uint64_t nanoseconds_since_epoch = (uint64_t)(tp.tv_sec) * 1000000000UL + (uint64_t)tp.tv_nsec;
memcpy(client->iov.iov_base, &nanoseconds_since_epoch, sizeof(nanoseconds_since_epoch));
}
return res;
}
int cork_clients(const program_options* options, server* srv, int value)
{
if (options->cork)
{
for (size_t i = 0; i < srv->n_clients; ++i)
{
int sockfd = srv->clients[i]->sockfd;
int res = set_cork(sockfd, value);
if (res < 0)
return res;
}
}
return 0;
}
ssize_t do_sendmsg(server* srv)
{
ssize_t bytes = 0;
for (size_t i = 0; i < srv->n_clients; ++i)
{
client_data* client = srv->clients[i];
ssize_t ret = sendmsg(client->sockfd, &client->msg, 0);
if (ret < 0)
{
perror("sendmsg");
return ret;
}
bytes += ret;
}
return bytes;
}
#if defined(WITH_URING)
ssize_t do_uring_sendmsg(server* srv)
{
size_t bytes = 0;
for (size_t i = 0; i < srv->n_clients; ++i)
{
client_data* client = srv->clients[i];
struct io_uring_sqe* sqe = io_uring_get_sqe(&srv->ring);
io_uring_prep_sendmsg(sqe, client->regfd, &client->msg, 0);
sqe->flags |= IOSQE_FIXED_FILE;
}
int ret = io_uring_submit(&srv->ring);
if (ret < 0)
return ret;
struct io_uring_cqe* cqes[4];
ret = io_uring_peek_batch_cqe(&srv->ring, cqes, srv->n_clients);
if (ret > 0)
{
for (int i = 0; i < ret; ++i)
{
const struct io_uring_cqe* cqe = *(cqes + i);
if (cqe->res < 0)
return cqe->res;
bytes += cqe->res;
}
io_uring_cq_advance(&srv->ring, ret);
}
return bytes;
}
#endif
typedef ssize_t (*send_func)(server* srv);
stats send_loop_rate(server* srv, const program_options* options)
{
summary *send_latency = NULL;
summary* batch_latency = NULL;
stats stats;
stats.send_latency = stats.batch_latency = NULL;
size_t send_period_ms = 0;
size_t messages_per_period = 0;
size_t sent_messages = 0;
int ret;
send_latency = summary_new(options->total_messages);
if (!send_latency)
{
perror("send_latency");
return stats;
}
batch_latency = summary_new(options->total_messages);
if (!batch_latency)
{
perror("batch_latency");
return stats;
}
stats.send_latency = send_latency;
stats.batch_latency = batch_latency;
if (options->message_rate > 1000)
{
send_period_ms = 1;
messages_per_period = options->message_rate / 1000;
}
else
{
send_period_ms = 1000 / options->message_rate;
messages_per_period = 1;
}
printf("Sending %llu messages every %llu ms...\n", (unsigned long long) messages_per_period, (unsigned long long) send_period_ms);
#if defined(WITH_URING)
send_func send_func = srv->enable_uring ? &do_uring_sendmsg : &do_sendmsg;
#else
printf("[Warn] uring not available, forcing sendmsg\n");
send_func send_func = &do_sendmsg;
#endif
for (size_t i = 0; i < srv->n_clients; ++i)
{
char *buffer = malloc(options->buffer_size);
memset(buffer, 'a' + i, options->buffer_size);
client_data* client = srv->clients[i];
client->iov.iov_base = (void *)buffer;
client->iov.iov_len = options->buffer_size;
memset(&client->msg, 0, sizeof(struct msghdr));
client->msg.msg_iov = &client->iov;
client->msg.msg_iovlen = 1;
}
#if defined(WITH_URING)
ret = register_uring_files(srv);
if (ret < 0)
return NULL;
#endif
do
{
struct timespec time_begin, time_end;
struct timespec tp;
ret = clock_gettime(CLOCK_REALTIME, &tp);
if (ret == 0)
{
uint64_t nanoseconds_since_epoch = (uint64_t)(tp.tv_sec) * 1000000000UL + (uint64_t)tp.tv_nsec;
for (size_t i = 0; i < srv->n_clients; ++i)
{
client_data* client = srv->clients[i];
memcpy(client->iov.iov_base, &nanoseconds_since_epoch, sizeof(nanoseconds_since_epoch));
}
}
cork_clients(options, srv, 1);
clock_gettime(CLOCK_MONOTONIC, &time_begin);
for (size_t i = 0; i < messages_per_period; ++i)
{
BEGIN_TIMED(send_latency)
{
ssize_t ret = (*send_func)(srv);
if (ret < 0)
{
printf("send error: %s\n", strerror(-ret));
break;
}
} END_TIMED();
}
sent_messages += messages_per_period;
clock_gettime(CLOCK_MONOTONIC, &time_end);
struct timespec elapsed = time_diff(time_begin, time_end);
cork_clients(options, srv, 0);
long sleep_time_ns = (send_period_ms * 1000000) - elapsed.tv_nsec;
if (sleep_time_ns > 0)
do_nanosleep(sleep_time_ns);
} while (sent_messages < options->total_messages);
return stats;
}
stats send_loop_batched(server* srv, const program_options* options)
{
summary *send_latency = NULL;
summary* batch_latency = NULL;
stats stats;
stats.send_latency = stats.batch_latency = NULL;
int ret;
size_t sent_messages = 0;
size_t batch_size;
send_latency = summary_new(options->total_messages);
if (!send_latency)
{
perror("send_latency");
return stats;
}
batch_latency = summary_new(options->total_messages);
if (!batch_latency)
{
perror("batch_latency");
return stats;
}
stats.send_latency = send_latency;
stats.batch_latency = batch_latency;
#if defined(WITH_URING)
send_func send_func = srv->enable_uring ? &do_uring_sendmsg : &do_sendmsg;
#else
printf("[Warn] uring not available, forcing sendmsg\n");
send_func send_func = &do_sendmsg;
#endif
for (size_t i = 0; i < srv->n_clients; ++i)
{
char *buffer = malloc(options->buffer_size);
memset(buffer, 'a' + i, options->buffer_size);
client_data* client = srv->clients[i];
client->iov.iov_base = (void *)buffer;
client->iov.iov_len = options->buffer_size;
memset(&client->msg, 0, sizeof(struct msghdr));
client->msg.msg_iov = &client->iov;
client->msg.msg_iovlen = 1;
}
#if defined(WITH_URING)
ret = register_uring_files(srv);
if (ret < 0)
return NULL;
#endif
batch_size = options->batch_size > 0 ? options->batch_size : 1;
do
{
struct timespec tp;
ret = clock_gettime(CLOCK_REALTIME, &tp);
if (ret == 0)
{
uint64_t nanoseconds_since_epoch = (uint64_t)(tp.tv_sec) * 1000000000UL + (uint64_t)tp.tv_nsec;
for (size_t i = 0; i < srv->n_clients; ++i)
{
client_data* client = srv->clients[i];
memcpy(client->iov.iov_base, &nanoseconds_since_epoch, sizeof(nanoseconds_since_epoch));
}
}
cork_clients(options, srv, 1);
for (size_t i = 0; i < batch_size; ++i)
{
BEGIN_TIMED(send_latency)
{
ssize_t ret = (*send_func)(srv);
if (ret < 0)
{
printf("send error: %s\n", strerror(-ret));
break;
}
}
END_TIMED()
++sent_messages;
}
cork_clients(options, srv, 0);
if (options->sleep_ms > 0)
do_nanosleep(options->sleep_ms * 1000000);
} while (sent_messages < options->total_messages);
return stats;
}
stats send_loop(server* srv, const program_options* options)
{
stats stats;
if (options->batch_size > 0)
stats = send_loop_batched(srv, options);
else if (options->message_rate > 0)
stats = send_loop_rate(srv, options);
return stats;
}
void accept_loop(server* srv, const program_options* options)
{
for (;;)
{
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int sockfd = accept(srv->sockfd, (struct sockaddr *) &client_addr, &len);
if (sockfd < 0)
{
perror("accept");
return;
}
set_blocking(sockfd, 0);
if (set_no_delay(sockfd, options->no_delay) < 0)
{
perror("set_no_delay");
break;
}
if (set_cork(sockfd, options->cork) < 0)
{
perror("set_cork");
break;
}
printf("Accepted connection from %s:%d\n", inet_ntoa(client_addr.sin_addr), client_addr.sin_port);
client_data* client = client_new(client_addr, sockfd);
if (!client)
{
perror("client_new");
break;
}
client->is_corked = options->cork;
srv->clients[srv->n_clients++] = client;
size_t remaining_clients = srv->clients_count - srv->n_clients;
if (remaining_clients == 0)
break;
else
printf("Waiting for %llu more clients\n", (unsigned long long) remaining_clients);
}
stats stats = send_loop(srv, options);
if (stats.send_latency)
{
puts("Send latency report");
report_summary(stats.send_latency);
}
if (stats.batch_latency)
{
puts("batch latency report");
report_summary(stats.batch_latency);
}
print_tcp_info(srv);
}
int main(int argc, char* argv[])
{
program_options options = parse_options(argc, argv);
if (validate_options(&options) == options_error)
return EXIT_FAILURE;
options_print(&options);
server* srv = server_start(&options);
if (!srv)
return EXIT_FAILURE;
printf("listening on 0.0.0.0:%d\n", options.port);
accept_loop(srv, &options);
server_stop(srv);
return EXIT_SUCCESS;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment