Skip to content

Instantly share code, notes, and snippets.

@jstimpfle
Created January 12, 2021 16:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jstimpfle/c79cae3b71bf390ebbed598034232114 to your computer and use it in GitHub Desktop.
Save jstimpfle/c79cae3b71bf390ebbed598034232114 to your computer and use it in GitHub Desktop.
//
// Baseline code (compiles on Linux) for sending UDP packets at a relatively
// high rate. With some modifications, this will be suited as a starting point
// to experiment with UDP-based transfer protocols.
//
// Achieves ~350 MiB/s (350K packets/s) on my laptop from 2011 (Lenovo x220,
// Sandy Bridge i5-2520M).
//
// Compile this as a binary "test" and run two instances in parallel (in two
// terminals).
//
// ./test receiver
// ./test sender
//
// 2021, Jens Stimpfle
//
#define _GNU_SOURCE // sendmmsg(), recvmmsg()
#define _POSIX_C_SOURCE 200809L
#include <assert.h>
#include <errno.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <unistd.h>
#define ARRAY_COPY(dst, src, count) ((void)((dst)==(src))/*type check*/, memcpy(dst, src, count * sizeof *dst))
void net_msg_vf(const char *fmt, va_list ap)
{
vfprintf(stderr, fmt, ap);
fprintf(stderr, "\n");
}
void net_msg_f(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
net_msg_vf(fmt, ap);
va_end(ap);
}
void net_fatal_f(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
net_msg_vf(fmt, ap);
va_end(ap);
abort();
}
static inline void *_net_xcalloc(size_t count, size_t elem_size)
{
void *ptr = calloc(count, elem_size);
if (ptr == NULL)
{
net_fatal_f("OOM");
}
return ptr;
}
static inline void net_free(void *ptr)
{
free(ptr);
}
#define net_xcalloc(count, type) ((type *) _net_xcalloc((count), sizeof (type)))
/////////////////////
// Time
/////////////////////
#include <time.h>
typedef struct timespec Timestamp;
#include <errno.h>
#include <stdint.h>
#include <inttypes.h>
static inline void format_time_delta(int64_t us, char *out, int size)
{
snprintf(out, size, "%" PRIi64 ".%.3" PRIi64 "s", us / 1000000, (us % 1000000) / 1000);
}
static inline Timestamp get_timestamp(void)
{
Timestamp ts;
int r = clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
if (r != 0) {
net_msg_f("clock_gettime() failed: %s", strerror(errno));
abort();
}
return ts;
}
static inline int64_t timestamp_diff_microsecs(Timestamp t1, Timestamp t2)
{
int64_t result = ((int64_t) t1.tv_sec - t2.tv_sec) * (int64_t) 1000000LL
+ ((int64_t) t1.tv_nsec - t2.tv_nsec) / (int64_t) 1000;
return result;
}
static inline int64_t timestamp_diff_nanosecs(Timestamp t1, Timestamp t2)
{
int64_t result = ((int64_t) t1.tv_sec - t2.tv_sec) * (int64_t) 1000000000LL
+ ((int64_t) t1.tv_nsec - t2.tv_nsec);
return result;
}
static inline int timestamp_less(Timestamp t1, Timestamp t2)
{
// do we have to deal with wraparound?
if (t1.tv_sec < t2.tv_sec)
return 1;
if (t1.tv_sec > t2.tv_sec)
return 0;
return t1.tv_nsec < t2.tv_nsec;
}
struct TimeMeasure { Timestamp ts, ts2; int active; };
#define MEASURE(caption) for (struct TimeMeasure tm = {0}; tm.active++ == 0 && ((tm.ts = get_timestamp()), 1); tm.ts2 = get_timestamp(), fprintf(stderr, "it took %dus\n", (int) (tm.ts2.tv_nsec - tm.ts.tv_nsec) / 1000))
struct NetAddress
{
int32_t ip_number;
int16_t port_number;
};
static struct sockaddr_in _netaddress_to_sockaddr_in(struct NetAddress netaddress)
{
struct sockaddr_in addr = {0};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(netaddress.ip_number);
addr.sin_port = htons(netaddress.port_number);
return addr;
}
static struct NetAddress _sockaddr_in_to_netaddress(struct sockaddr_in *addr)
{
struct NetAddress naddr = {0};
naddr.ip_number = ntohl(addr->sin_addr.s_addr);
naddr.port_number = ntohs(addr->sin_port);
return naddr;
}
int net_address_parse(const char *node, const char *service, struct NetAddress *out)
{
struct addrinfo *res = NULL;
int r = getaddrinfo(node, service, NULL, &res);
if (r != 0) {
net_msg_f("getaddrinfo() did not find any suitable address: %s", gai_strerror(r));
return 0;
}
for (struct addrinfo *ai = res;
ai != NULL;
ai = ai->ai_next)
{
if (ai->ai_family == AF_INET
&& ai->ai_socktype == SOCK_DGRAM
&& ai->ai_protocol == IPPROTO_UDP)
{
assert(ai->ai_addrlen == sizeof (struct sockaddr_in));
*out = _sockaddr_in_to_netaddress((struct sockaddr_in *) ai->ai_addr);
freeaddrinfo(res);
return 1;
}
}
net_msg_f("getaddrinfo() did not find any suitable address");
freeaddrinfo(res);
return 0;
}
enum
{
NET_FRAME_SIZE = 1024,
};
struct NetReliableHeader
{
uint32_t sn; // sequence number
uint32_t ack_sn; // acknowledged sequence number
uint16_t payload_size;
uint8_t payload[];
};
// A structure suitable for sending and receiving packets
struct NetPacket
{
struct NetAddress peer_address;
Timestamp last_send_time;
uint16_t num_send_attempts;
uint16_t data_size;
char data[]; // for example a NetReliableHeader
};
// A structure suitable as a buffer of NetPackets for sending and receiving on
// Linux (recvmmsg() / sendmmsg())
struct NetPacketQueue
{
struct NetPacket **packets;
struct mmsghdr *mmsghdrs;
struct iovec *iovecs;
struct sockaddr_in *sockaddrs;
int capacity;
int start_index;
int fill_count;
};
struct NetPacketQueue *net_packet_queue_create(void);
void net_packet_queue_destroy(struct NetPacketQueue *queue);
void net_packet_queue_set_capacity(struct NetPacketQueue *queue, int capacity);
int net_packet_queue_enqueue(struct NetPacketQueue *queue, struct NetPacket *packet);
int net_packet_queue_send_or_recv(struct NetPacketQueue *queue, int sockfd, struct NetPacket **packets_return, int count, int is_send);
struct NetPacketQueue *net_packet_queue_create(void)
{
return net_xcalloc(1, struct NetPacketQueue);
}
void net_packet_queue_destroy(struct NetPacketQueue *queue)
{
return net_free(queue);
}
void net_packet_queue_set_capacity(struct NetPacketQueue *queue, int capacity)
{
if (queue->capacity > 0)
{
net_fatal_f("Not supported.");
}
queue->packets = net_xcalloc(capacity, struct NetPacket *);
queue->mmsghdrs = net_xcalloc(capacity, struct mmsghdr);
queue->iovecs = net_xcalloc(capacity, struct iovec);
queue->sockaddrs = net_xcalloc(capacity, struct sockaddr_in);
queue->capacity = capacity;
for (int i = 0; i < capacity; i++)
{
struct mmsghdr *hdr = queue->mmsghdrs + i;
struct iovec *iov = queue->iovecs + i;
struct sockaddr_in *saddr = queue->sockaddrs + i;
memset(hdr, 0, sizeof *hdr);
//hdr->msg_hdr.msg_name = saddr;
hdr->msg_hdr.msg_namelen = sizeof *saddr;
hdr->msg_hdr.msg_iov = iov;
hdr->msg_hdr.msg_iovlen = 1;
}
}
int net_packet_queue_enqueue(struct NetPacketQueue *queue, struct NetPacket *packet)
{
if (queue->fill_count == queue->capacity)
{
return 0;
}
int index = queue->start_index + queue->fill_count;
if (index >= queue->capacity)
{
index -= queue->capacity;
}
assert(0 <= index && index < queue->capacity);
queue->fill_count += 1;
queue->packets[index] = packet;
queue->iovecs[index].iov_base = packet->data;
queue->iovecs[index].iov_len = packet->data_size;
queue->sockaddrs[index] = _netaddress_to_sockaddr_in(packet->peer_address); // only needed if this is a send queue
return 1;
}
int net_packet_queue_send_or_recv(struct NetPacketQueue *queue, int sockfd, struct NetPacket **packets_return, int count, int is_send)
{
int num_todo = count;
if (num_todo > queue->fill_count)
{
num_todo = queue->fill_count;
}
int num_done = 0;
// We need a loop because the queue is implemented using a ringbuffer
// There will be at most 2 iterations.
while (num_done < num_todo)
{
int num_ship = queue->capacity - queue->start_index;
if (num_ship > queue->fill_count)
{
num_ship = queue->fill_count;
}
int r;
if (is_send)
{
r = sendmmsg(sockfd, queue->mmsghdrs + queue->start_index, num_ship, 0);
//net_msg_f("sendmmsg(sockfd=%d, num_ship=%d) = %d", sockfd, num_ship, r);
}
else
{
#if 0
// wait at most 100 usecs. If we don't specify an explicit time,
// Linux will only give us 5-10 messages per call :-(
struct timespec max_time = { 0, 100000 };
r = recvmmsg(sockfd, queue->mmsghdrs + queue->start_index, num_ship, 0, &max_time);
#else
r = recvmmsg(sockfd, queue->mmsghdrs + queue->start_index, num_ship, 0, NULL);
#endif
//net_msg_f("recvmmsg(): num_ship==%d. r=%d", num_ship, r);
}
if (r < 0)
{
if (errno == EAGAIN)
{
break;
}
else
{
net_fatal_f("Unhandled error: %s", strerror(errno));
}
}
ARRAY_COPY(packets_return + num_done, queue->packets + queue->start_index, num_ship);
num_done += r;
queue->fill_count -= r;
queue->start_index += r;
if (queue->start_index == queue->capacity)
{
queue->start_index = 0;
}
assert(queue->start_index < queue->capacity);
// This loop is strictly to handle the wraparound - we don't
// want to end up shipping many small batches. So if less
// packetes were shipped than requested, we need to break and
// come back later.
if (r != num_ship)
{
break;
}
}
return num_done;
}
struct NetConn
{
int sockfd;
};
int _net_make_nonblocking_socket(void)
{
int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
int flags = fcntl(sockfd, F_GETFL, 0);
if (flags == -1) {
close(sockfd);
return -1;
}
int r = fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
if (r == -1) {
close(sockfd);
return -1;
}
return sockfd;
}
struct NetConn *net_conn_create(void)
{
struct NetConn *conn = net_xcalloc(1, struct NetConn);
conn->sockfd = _net_make_nonblocking_socket();
return conn;
}
void net_conn_destroy(struct NetConn *conn)
{
close(conn->sockfd);
net_free(conn);
}
void net_conn_connect(struct NetConn *conn, struct NetAddress server_address)
{
struct sockaddr_in saddr = _netaddress_to_sockaddr_in(server_address);
int r = connect(conn->sockfd, (struct sockaddr *) &saddr, sizeof saddr);
if (r < 0)
{
net_fatal_f("Failed to connect(): %s", strerror(errno));
}
}
void net_conn_bind(struct NetConn *conn, struct NetAddress local_address)
{
struct sockaddr_in saddr = _netaddress_to_sockaddr_in(local_address);
int r = bind(conn->sockfd, (struct sockaddr *) &saddr, sizeof saddr);
if (r < 0)
{
net_fatal_f("Failed to bind(): %s", strerror(errno));
}
}
void print_usage_and_exit(void)
{
net_msg_f("Usage: ./test {sender|receiver}");
exit(1);
}
int main(int argc, const char **argv)
{
if (argc != 2)
{
print_usage_and_exit();
}
int is_sender;
if (!strcmp(argv[1], "sender"))
{
is_sender = 1;
}
else if (!strcmp(argv[1], "receiver"))
{
is_sender = 0;
}
else
{
print_usage_and_exit();
}
struct NetAddress server_address;
if (!net_address_parse("127.0.0.1", "9009", &server_address))
{
net_fatal_f("Failed to parse NetAddress");
}
struct NetConn *conn = net_conn_create();
struct NetPacketQueue *pkq = net_packet_queue_create();
if (is_sender)
{
net_conn_connect(conn, server_address);
}
else
{
net_conn_bind(conn, server_address);
}
net_packet_queue_set_capacity(pkq, 1024); // UIO_MAXIOV is currently 1024, so it doesn't make a lot of sense to go beyond that
int free_packets_capacity = 1024;
int free_packets_count = free_packets_capacity;
struct NetPacket **free_packets_stack = net_xcalloc(free_packets_capacity, struct NetPacket *);
void *packets_memory = _net_xcalloc(free_packets_capacity, sizeof (struct NetPacket) + NET_FRAME_SIZE);
for (int i = 0; i < free_packets_capacity; i++)
{
struct NetPacket *packet = (void *) ((char *) packets_memory + i * (sizeof (struct NetPacket) + NET_FRAME_SIZE));
free_packets_stack[i] = packet;
packet->data_size = NET_FRAME_SIZE;
// Fill the packet payload with some fake data. Zeroed data
// works as well, but when it's written to the terminal one
// doesn't notice because no characters appear. Which is a
// problem because the terminal is typically the bottleneck, so
// one wants to know when the data is being written there.
int index = 0;
for (int j = 0; index < NET_FRAME_SIZE; j = (j + 1) % 26)
{
char c = 'A' + j;
int stop = index + 32;
if (stop > NET_FRAME_SIZE)
{
stop = NET_FRAME_SIZE;
}
memset((char *)packet->data + index, c, stop - index);
index = stop;
if (index > 0)
{
packet->data[index - 1] = '\n';
}
}
}
uint64_t num_iters = 0;
uint64_t num_packets_sent = 0;
uint64_t num_packets_received = 0;
for (;;)
{
++ num_iters;
static struct timespec last_log;
struct timespec now;
clock_gettime(CLOCK_MONOTONIC_RAW, &now);
if (((uint64_t) 1000000000LL * (uint64_t) now.tv_sec + (uint64_t) now.tv_nsec)
- ((uint64_t) 1000000000LL * (uint64_t) last_log.tv_sec + (uint64_t) last_log.tv_nsec)
> 1000000000LL)
{
last_log = now;
net_msg_f("\n\nnow %" PRIu64 " iterations", num_iters);
net_msg_f("%" PRIu64 " packets sent", num_packets_sent);
net_msg_f("%" PRIu64 " packets received", num_packets_received);
}
// Put as many packets in the ship queue as possible.
// XXX: we're not actually sending anything useful, for now.
while (free_packets_count > 0 && net_packet_queue_enqueue(pkq, free_packets_stack[free_packets_count - 1]))
{
-- free_packets_count;
}
// try to ship
{
int num = net_packet_queue_send_or_recv(pkq, conn->sockfd,
free_packets_stack + free_packets_count,
free_packets_capacity - free_packets_count,
is_sender);
for (int i = free_packets_count; i < free_packets_count + num; i++)
{
struct NetPacket *packet = free_packets_stack[i];
fwrite(packet->data, 1, packet->data_size, stdout);
}
free_packets_count += num;
if (is_sender)
{
num_packets_sent += num;
}
else
{
num_packets_received += num;
}
//net_msg_f("%s %d packets", is_sender ? "sent out" : "received", num);
}
{
struct timespec ts = { 0, 100000 };
nanosleep(&ts, NULL);
}
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment