Skip to content

Instantly share code, notes, and snippets.

@apconole
Created April 10, 2024 15:42
Show Gist options
  • Save apconole/5d9985eeab495455c0e6c9c08e017086 to your computer and use it in GitHub Desktop.
Save apconole/5d9985eeab495455c0e6c9c08e017086 to your computer and use it in GitHub Desktop.
/* Copyright (C) 2024 - Red Hat, Inc.
* Author: Aaron Conole <aconole@redhat.com>
* Licensed under GPLv2
*/
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include <errno.h>
#include <fcntl.h>
#include <getopt.h>
#include <netinet/in.h>
#include <poll.h>
#include <pthread.h>
#include <sched.h>
#include <string.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <time.h>
#include <unistd.h>
#define BUCKETS 16
#define MAX_EVENTS 16
enum polling_mechanism {
PM_USE_READ_NB,
PM_USE_SELECT,
PM_USE_POLL,
PM_USE_EPOLL,
};
struct bucket {
uint64_t frame_size;
uint64_t bytes;
uint64_t last_time_rx;
};
struct thread_data {
struct bucket buckets[BUCKETS];
int16_t verbose;
uint16_t port;
uint64_t end_time;
uint8_t core_idx;
enum polling_mechanism poll_type;
pthread_t thread_handle;
};
static struct thread_data *byte_ctrs;
#define NSEC_PER_USEC 1000
#define MS_PER_SEC 1000
#define US_PER_MS 1000
static int __nsleep(const struct timespec *req, struct timespec *rem)
{
return clock_nanosleep(CLOCK_MONOTONIC, 0, req, rem);
}
static void pend_us(uint32_t usec)
{
struct timespec req={0},rem={0};
time_t sec = (int)(usec / (US_PER_MS * MS_PER_SEC));
usec = usec - (sec * (US_PER_MS * MS_PER_SEC));
req.tv_sec = sec;
req.tv_nsec = usec * NSEC_PER_USEC;
(void)__nsleep(&req, &rem);
}
static void pend_s(uint32_t sec) {
pend_us(sec * US_PER_MS * MS_PER_SEC);
}
static uint64_t time_ticks_ms()
{
uint64_t value = 0;
struct timespec currentTime;
if (clock_gettime(CLOCK_MONOTONIC, &currentTime)) {
return 0; /* 0 ticks... uh-oh */
}
value = currentTime.tv_sec * MS_PER_SEC;
value += currentTime.tv_nsec / (NSEC_PER_USEC * US_PER_MS);
return value;
}
static size_t *bucket_sizes;
static size_t num_buckets;
static int get_bucket(uint64_t length)
{
for (size_t i = 0; i < num_buckets && i < BUCKETS; ++i) {
if (length <= bucket_sizes[i]) {
return i;
}
}
return num_buckets - 1;
}
#define GBIT (1ULL << 30)
#define MBIT (1ULL << 20)
#define KBIT (1ULL << 10)
static const char *get_human_rate(uint64_t bytes, uint64_t seconds)
{
static char human_rate[80] = {};
double rate_bps = 8.0 * (double) bytes / (double) seconds;
char *unit;
if (rate_bps >= GBIT) {
rate_bps = rate_bps / GBIT;
unit = "Gbps";
} else if (rate_bps >= MBIT) {
rate_bps = rate_bps / MBIT;
unit = "Mbps";
} else if (rate_bps >= KBIT) {
rate_bps = rate_bps / KBIT;
unit = "Kbps";
} else {
unit = "Bps";
}
snprintf(human_rate, 80, "%.2lf %s", rate_bps, unit);
return human_rate;
}
static void report(struct bucket *buckets, int verb, uint64_t start_time_ms)
{
uint64_t elapsed_time = (time_ticks_ms() - start_time_ms) / MS_PER_SEC;
uint64_t total_bw = 0;
if (verb < 0) return; /* skip reporting when verb is quiet */
for (size_t i = 0; i < num_buckets; i++) {
total_bw += buckets[i].bytes;
if (buckets[i].bytes || verb > 0)
printf("Bucket %lu(%lu): %lu bytes in %lus (rate: %f bits/s)\n",
i, bucket_sizes[i], buckets[i].bytes, elapsed_time,
8 * (double)buckets[i].bytes / (double)elapsed_time);
}
if (verb > 0)
printf("+++ Total bw: %lu bytes in %lus = %s (rate %f bits/s)\n",
total_bw, elapsed_time, get_human_rate(total_bw, elapsed_time),
8.0 * (double) total_bw / (double) elapsed_time);
}
static int prepare_socket(int socket, enum polling_mechanism pm)
{
int flags = fcntl(socket, F_GETFL, 0);
if (flags == -1) {
return -1;
}
/* For select, we won't use a non-blocking socket because it will just
* always return 'true'. There is still an ioctl check for # bytes, but
* this might exercise the select method more. */
if (pm == PM_USE_SELECT)
return 0;
if (fcntl(socket, F_SETFL, flags | O_NONBLOCK) == -1) {
return -1;
}
return 0;
}
static int check_nb_socket(int socket)
{
char buf;
ssize_t bytes_read = recv(socket, &buf, 1, MSG_PEEK | MSG_DONTWAIT);
if (bytes_read == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0;
} else {
return -1;
}
}
return 1;
}
static int epoll_udp_result(int epoll_fd)
{
struct epoll_event event;
int nfds;
if (epoll_fd != -1) {
nfds = epoll_wait(epoll_fd, &event, 1, 500);
if (nfds <= 0)
return nfds;
if (event.events & EPOLLIN) {
return 1;
}
}
return 0;
}
static int poll_udp_result(int socket)
{
struct pollfd fds[1];
int nfds;
fds[0].fd = socket;
fds[0].events = POLLIN;
nfds = poll(fds, 1, 500);
if (nfds <= 0)
return nfds;
if (fds[0].revents & POLLIN)
return 1;
return 0;
}
static int select_udp_result(int socket)
{
struct timeval tv;
fd_set fds;
unsigned int bytes = 0;
tv.tv_sec = 0;
tv.tv_usec = 500 * US_PER_MS;
FD_ZERO(&fds);
FD_SET(socket, &fds);
if (select(socket + 1, &fds, NULL, NULL, &tv) > 0) {
/* check if recv would return bytes */
if (ioctl(socket, FIONREAD, &bytes) < 0) {
return -1;
}
return bytes > 0;
}
return 0;
}
static int ready_to_read(int socket, int epoll_fd,
enum polling_mechanism pm)
{
/* Each case will fallthrough on 'failure' */
switch (pm) {
case PM_USE_EPOLL:
if (epoll_udp_result(epoll_fd) > 0)
return 1;
/* fallthrough */
case PM_USE_POLL:
if (poll_udp_result(socket) > 0)
return 1;
/* fallthrough */
case PM_USE_SELECT:
if (select_udp_result(socket) > 0)
return 1;
/* fallthrough */
case PM_USE_READ_NB:
/* fallthrough */
default:
return check_nb_socket(socket);
}
}
static int add_to_epoll(int epoll_fd, int socket) {
struct epoll_event event;
event.events = EPOLLIN | EPOLLET;
event.data.fd = socket;
return epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket, &event);
}
static void *udp_listener(void *arg) {
struct thread_data *td = (struct thread_data *)arg;
int optval = 1, udp_socket, res;
int epollfd = -1;
struct sockaddr_in sa;
if (td->core_idx) {
pid_t curr = gettid();
cpu_set_t cpus;
CPU_ZERO(&cpus);
CPU_SET(td->core_idx, &cpus);
sched_setaffinity(curr, sizeof(cpus), &cpus);
}
thr_restart:
udp_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (udp_socket < 0) {
printf("THREAD-FATAL: Failed to allocate socket (%s).\n",
strerror(errno));
return NULL;
}
if (setsockopt(udp_socket, SOL_SOCKET, SO_REUSEPORT, &optval,
sizeof(optval)) < 0) {
printf("THREAD-FATAL: Failed to setsockopt (%s).\n", strerror(errno));
close(udp_socket);
return NULL;
}
if (prepare_socket(udp_socket, td->poll_type) < 0) {
printf("THREAD-FATAL: Failed to prepare: %s", strerror(errno));
close(udp_socket);
return NULL;
}
/* XXX: Add a v6 option here as well. */
memset(&sa, 0, sizeof sa);
sa.sin_family = AF_INET;
sa.sin_port = htons(td->port);
sa.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(udp_socket, (struct sockaddr *)&sa, sizeof sa) < 0) {
if (td->verbose != -1) {
printf("ERROR: Failed to bind (%s).\n", strerror(errno));
}
close(udp_socket);
pend_s(1);
goto thr_restart;
}
if (td->poll_type == PM_USE_EPOLL) {
epollfd = epoll_create1(0);
if (epollfd >= 0) {
if (add_to_epoll(epollfd, udp_socket) < 0) {
close(epollfd);
epollfd = -1;
}
}
if (epollfd < 0 && td->verbose != -1) {
printf("ERROR: Couldn't setup epoll socket - using fallback.\n");
}
}
if (td->verbose > 0 && epollfd >= 0)
printf("+++ Allocated epoll socket.\n");
while (!td->end_time || (td->end_time > time_ticks_ms())) {
socklen_t len = sizeof(struct sockaddr_in);
struct sockaddr_in sar;
char buf[2048];
int read = 0;
while (ready_to_read(udp_socket, epollfd, td->poll_type) > 0) {
read = 1;
do {
if ((res = recvfrom(udp_socket, &buf, 2048, 0,
(struct sockaddr *)&sar,
(socklen_t *)&len)) > 0)
td->buckets[get_bucket(res)].bytes += res;
} while (res > 0 && td->poll_type != PM_USE_SELECT);
if (res <= 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
if (td->verbose > 0)
printf("thread recvfrom failure. restarting socket.\n");
close(epollfd);
close(udp_socket);
goto thr_restart;
}
}
if (!read) {
res = 0;
pend_us(500);
}
if (epollfd == -1 && ready_to_read(udp_socket, epollfd,
td->poll_type) < 0) {
if (td->verbose > 0)
printf("thread read ready failure. restarting socket.\n");
close(epollfd);
close(udp_socket);
goto thr_restart;
}
}
close(epollfd);
close(udp_socket);
return NULL;
}
static void help(char *prgm)
{
printf("Usage: %s [OPTIONS].. port\n", prgm);
printf("Simple UDP sink that tells how much packet data was sent "
"with heuristics.\n");
printf("\n");
printf("Mandatory arguments:\n");
printf(" port Set the port on which to listen.\n");
printf("\n");
printf("Options and arguments:\n");
printf(" -h, --help Print this help message and exit.\n");
printf(" -v, --verbose Set the program to be verbose in output.\n");
printf(" -t, --stat-time=SEC Set the stat print time in seconds.\n");
printf(" -a, --stop-after=SEC Set the time to complete - default is "
"0 (Unlimited).\n");
printf(" -H, --threads=THREADS Spawn THREADS number of threads for "
"listening (default = 1).\n");
printf(" -F, --affinity Only when spawning threads, forces all "
"threads onto cores.\n");
printf(" -m, --method=MTHD One of {epoll, poll, select, read}\n");
printf(" -q, --quiet Set the program to suppress output.\n");
exit(0);
}
static void parse_arguments(int argc, char *argv[], bool *affinity,
uint16_t *nr_threads, int *verbose,
uint64_t *secs_to_report, uint64_t *secs_to_run,
uint16_t *lport, char **bucket_string,
enum polling_mechanism *pt)
{
int opt;
static struct option long_options[] = {
{"affinity", no_argument, NULL, 'F'},
{"buckets", required_argument, NULL, 'k'},
{"help", no_argument, NULL, 'h'},
{"method", required_argument, NULL, 'm'},
{"quiet", no_argument, NULL, 'q'},
{"stat-time", required_argument, NULL, 't'},
{"stop-after", required_argument, NULL, 'a'},
{"threads", required_argument, NULL, 'H'},
{"verbose", no_argument, NULL, 'v'},
{NULL, 0, NULL, 0}
};
*pt = PM_USE_EPOLL;
while ((opt = getopt_long(argc, argv, "Fhvqt:a:H:k:m:", long_options,
NULL)) != -1) {
if (opt == 0)
opt = long_options[optind].val;
switch (opt) {
case 'F':
*affinity = true;
break;
case 'k':
*bucket_string = strdup(optarg);
break;
case 'H':
*nr_threads = atoi(optarg);
break;
case 'm':
if (!strcmp(optarg, "poll"))
*pt = PM_USE_POLL;
else if (!strcmp(optarg, "epoll"))
*pt = PM_USE_EPOLL;
else if (!strcmp(optarg, "select"))
*pt = PM_USE_SELECT;
else if (!strcmp(optarg, "read"))
*pt = PM_USE_READ_NB;
else
help(argv[0]);
break;
case 'v':
*verbose = 1;
break;
case 'q':
*verbose = -1;
break;
case 't':
*secs_to_report = atoi(optarg);
break;
case 'a':
*secs_to_run = atoi(optarg);
break;
default: /* fallthrough */
case 'h':
help(argv[0]);
break;
}
}
/* Parse non-option argument - which should be port. */
if (optind < argc && argv[optind]) {
*lport = atoi(argv[optind]);
} else {
printf("%s: missing port\n", argv[0]);
help(argv[0]);
}
}
static void setup_buckets(const char *buckets, int32_t verb)
{
char *current_tok;
char *save_ptr;
char *bucket_string = strdup(buckets);
if (bucket_string == NULL) {
printf("FATAL: Unable to allocate bucket string.\n");
exit(-1);
}
for (current_tok = strtok_r(bucket_string, ",", &save_ptr);
current_tok != NULL && num_buckets < BUCKETS;
current_tok = strtok_r(NULL, ",", &save_ptr)) {
char *end_ptr = NULL;
bucket_sizes = realloc(bucket_sizes, sizeof(size_t) * (++num_buckets));
if (bucket_sizes == NULL) {
printf("FATAL: Bucket setup failed: %s", strerror(errno));
free(bucket_string);
exit(-1);
}
bucket_sizes[num_buckets - 1] = strtoul(current_tok, &end_ptr, 0);
if (end_ptr == current_tok || end_ptr == NULL) {
printf("FATAL: Invalid number: '%s'.\n", current_tok);
free(bucket_string);
exit(-1);
}
}
free(bucket_string);
if (verb) {
size_t i;
printf("++ Setup %lu buckets {", num_buckets);
for (i = 0; i < num_buckets; i++) {
printf("%lu,", bucket_sizes[i]);
}
printf("}\n");
}
}
static void run_report(int32_t verb, uint16_t nr_threads, uint64_t start_time)
{
struct bucket buckets[BUCKETS];
size_t thread;
if (verb < 0)
return;
memset(&buckets, 0, sizeof buckets);
for (thread = 0; thread < nr_threads; thread++) {
size_t bucket;
for (bucket = 0; bucket < BUCKETS; bucket++)
buckets[bucket].bytes += byte_ctrs[thread].buckets[bucket].bytes;
}
report(buckets, verb, start_time);
}
int main(int argc, char *argv[])
{
char *bucket_string = "64,128,256,512,1024,1500,1800,2048";
uint64_t secs_to_report = 5;
enum polling_mechanism pt;
uint64_t secs_to_run = 0;
uint16_t nr_threads = 1;
uint64_t start_time = 0;
uint64_t end_time = 0;
uint64_t cur_time = 0;
bool affinity = false;
uint16_t lport = 0;
int32_t verb = 0;
int opt_ind;
parse_arguments(argc, argv, &affinity, &nr_threads, &verb, &secs_to_report,
&secs_to_run, &lport, &bucket_string, &pt);
if (verb > 0) {
printf("++++ Starting listeners on %d reporting every %lus\n"
" polling with %d and then stopping", lport,
secs_to_report, pt);
if (secs_to_run) {
printf(" after %lus.\n", secs_to_run);
} else {
printf(" on kill / CTRL+C\n");
}
}
setup_buckets(bucket_string, verb);
byte_ctrs = calloc(sizeof(struct thread_data), nr_threads);
if (!byte_ctrs) {
printf("ERROR: Unable to allocate thread buckets.\n");
exit(-1);
}
if (secs_to_run)
end_time = time_ticks_ms() + (secs_to_run * MS_PER_SEC);
for (opt_ind = 0; opt_ind < nr_threads; opt_ind++) {
if (verb > 0)
printf("Spawning thread (%d)...\n", opt_ind + 1);
byte_ctrs[opt_ind].verbose = verb;
byte_ctrs[opt_ind].port = lport;
byte_ctrs[opt_ind].end_time = end_time;
if (affinity)
byte_ctrs[opt_ind].core_idx = opt_ind + 1;
pthread_create(&byte_ctrs[opt_ind].thread_handle, NULL,
udp_listener, &byte_ctrs[opt_ind]);
}
cur_time = time_ticks_ms();
start_time = cur_time;
while ((end_time > cur_time) || !end_time) {
pend_s(secs_to_report);
run_report(verb, nr_threads, start_time);
cur_time = time_ticks_ms();
}
for (opt_ind = 0; opt_ind < nr_threads; opt_ind++) {
pthread_join(byte_ctrs[opt_ind].thread_handle, NULL);
}
/* Final report - always print. */
run_report(1, nr_threads, start_time);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment