Skip to content

Instantly share code, notes, and snippets.

@foxhoundsk
Last active September 16, 2022 18:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save foxhoundsk/19cdc786eb05a8d6836791000da3569a to your computer and use it in GitHub Desktop.
Save foxhoundsk/19cdc786eb05a8d6836791000da3569a to your computer and use it in GitHub Desktop.
Benchmarking POSIX message queue
#define _GNU_SOURCE
#include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <mqueue.h>
#include <stdio.h>
#include <time.h>
#include <signal.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <sched.h>
#define MQ_NAME "/mq_bench"
volatile int should_run = 1;
volatile size_t nr_msg;
struct timespec start;
char buf[8192];
void* sender_wrk(void *data)
{
char msg[] = "this is a dummy message";
int fd = mq_open(MQ_NAME, O_WRONLY | O_NONBLOCK);
if (fd < 0)
perror("sender: mq_open failed");
while(should_run) {
/* dummy computation */
for (int i = 0; i < sizeof(msg); i++)
msg[i]++;
mq_send(fd, msg, sizeof(msg), 0);
}
mq_close(fd);
pthread_exit(NULL);
}
void sigalrm_handler(int sig)
{
should_run = 0;
}
void sigint_handler(int sig)
{
should_run = 0;
}
void siginfo_handler(int sig)
{
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
printf("total messages: %ld\n %f messages/s\n", nr_msg,
nr_msg / ((now.tv_sec + ((float) now.tv_nsec / 1000000000)) -
((start.tv_sec + ((float) start.tv_nsec / 1000000000)))));
}
int main(int ac, char **av)
{
pthread_t sender;
struct timespec now;
cpu_set_t cpu_affi;
int has_affi = 1;
if (ac != 3)
has_affi = 0;
// default values, ignored for root processes, but there still exists a hard limit
struct mq_attr mq_attr = {.mq_maxmsg = 10, .mq_msgsize = 8192};
if (has_affi) {
CPU_ZERO(&cpu_affi);
CPU_SET(0, &cpu_affi); // reveiver has affinity of CPU 0
if (sched_setaffinity(getpid(), sizeof(cpu_set_t), &cpu_affi)) {
perror("Error setting CPU affinity");
exit(-1);
}
}
signal(SIGINT, sigint_handler);
signal(SIGTSTP, siginfo_handler);
signal(SIGALRM, sigalrm_handler);
if (ac > 1) {
alarm(strtol(av[1], NULL, 10));
}
mqd_t fd = mq_open(MQ_NAME, O_RDONLY | O_CREAT, 0644, &mq_attr);
if (fd < 0) {
perror("receiver: mq_open failed");
exit(-1);
}
if (mq_getattr(fd, &mq_attr)) {
perror("mq_getattr failed");
exit(-1);
}
printf("mq_msgsize: %ld\nmq_curmsgs: %ld\nmq_maxmsg: %ld\n",
mq_attr.mq_msgsize, mq_attr.mq_curmsgs, mq_attr.mq_maxmsg);
if (pthread_create(&sender, NULL, sender_wrk, NULL)) {
puts("Failed creating sender thread");
exit(-1);
}
if (has_affi) {
CPU_ZERO(&cpu_affi);
CPU_SET(1, &cpu_affi);
if (pthread_setaffinity_np(sender, sizeof(cpu_set_t), &cpu_affi)) {
perror("Error setting CPU affinity");
exit(-1);
}
}
clock_gettime(CLOCK_MONOTONIC, &start);
while(should_run) {
ssize_t nr_recv;
nr_recv = mq_receive(fd, buf, sizeof(buf), NULL);
if (nr_recv < 0) {
perror("receive error");
exit(-1);
}
nr_msg++;
}
clock_gettime(CLOCK_MONOTONIC, &now);
printf("total messages: %ld\n%f messages/s\n", nr_msg,
nr_msg / ((now.tv_sec + ((float) now.tv_nsec / 1000000000)) -
((start.tv_sec + ((float) start.tv_nsec / 1000000000)))));
pthread_join(sender, NULL);
if (mq_close(fd)) {
perror("mq close failed");
exit(-1);
}
if (mq_unlink(MQ_NAME)) {
perror("mq unlink failed");
exit(-1);
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment