Skip to content

Instantly share code, notes, and snippets.

@hmenke
Last active May 17, 2022 12:24
Show Gist options
  • Save hmenke/83bc7a26c0f5f3e0867f4e5a9573e182 to your computer and use it in GitHub Desktop.
Save hmenke/83bc7a26c0f5f3e0867f4e5a9573e182 to your computer and use it in GitHub Desktop.
Example of POSIX mqueue to communicate with pthreads.
#ifndef __cplusplus
#define _GNU_SOURCE
#endif
// C standard library
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
// POSIX C library
#include <mqueue.h>
#include <pthread.h>
#include <unistd.h>
// Serialization
typedef struct _message message;
struct _message {
int id;
char *text;
};
message *message_new(int id, const char *text) {
message *msg = (message *)malloc(sizeof(message));
msg->id = id;
msg->text = text ? strdup(text) : NULL;
return msg;
}
int message_id(message *msg) { return msg->id; }
char *message_text(message *msg) { return msg->text; }
void message_free(message *msg) {
free(msg->text);
free(msg);
msg = NULL;
}
typedef struct _archive archive;
struct _archive{
size_t size;
char *data;
};
archive *archive_new(size_t size, const char *data) {
archive *ar = (archive *)malloc(sizeof(archive));
ar->data = NULL;
ar->size = size;
if (size > 0) {
memcpy(ar->data, data, size);
}
return ar;
}
size_t archive_size(archive *ar) { return ar->size; }
char *archive_data(archive *ar) { return ar->data; }
void archive_free(archive *ar) {
free(ar->data);
free(ar);
ar = NULL;
}
archive *message_serialize(const message *msg) {
archive *ar = archive_new(0, NULL);
size_t len = strlen(msg->text) + 1;
ar->size = sizeof(int) + len * sizeof(char);
ar->data = (char *)malloc(ar->size);
size_t offset = 0;
memcpy(ar->data + offset, &msg->id, sizeof(int));
offset += sizeof(int);
memcpy(ar->data + offset, msg->text, len * sizeof(char));
return ar;
}
message *message_unserialize(const char *buffer) {
message *msg = message_new(0, NULL);
size_t offset = 0;
memcpy(&msg->id, buffer + offset, sizeof(int));
offset += sizeof(int);
// I hope you serialized that null terminator
msg->text = strdup(buffer + offset);
return msg;
}
int message_send(mqd_t mqdes, const message *msg, unsigned int msg_prio) {
archive *ar = message_serialize(msg);
int ret = mq_send(mqdes, archive_data(ar), archive_size(ar), msg_prio);
archive_free(ar);
return ret;
}
message *message_receive(mqd_t mqdes, char *buffer, size_t size) {
struct mq_attr mqstat;
if (size == 0) {
mq_getattr(mqdes, &mqstat);
size = mqstat.mq_msgsize;
buffer = (char *)malloc(size * sizeof(char));
}
message *msg = NULL;
switch (mq_receive(mqdes, buffer, size, NULL)) {
case 0:
// zero-length message means termination
break;
case -1:
perror("mq_receive");
break;
default:
msg = message_unserialize(buffer);
break;
}
if (size == 0) {
free(buffer);
}
return msg;
}
// Thread
typedef struct {
int tid;
mqd_t *pmqdes;
} thread_info;
static void *start_routine(void *arg) {
thread_info const *const tinfo = (thread_info *)arg;
mqd_t mqdes = *(tinfo->pmqdes);
struct mq_attr mqstat;
mq_getattr(mqdes, &mqstat);
char *buffer = (char *)malloc(mqstat.mq_msgsize * sizeof(char));
message *msg;
while ((msg = message_receive(mqdes, buffer, mqstat.mq_msgsize)) != NULL) {
printf("%d [%ld] %d, %s", tinfo->tid,
strlen(message_text(msg)), message_id(msg), message_text(msg));
usleep(10000); // Placeholder for long work
message_free(msg);
}
free(buffer);
return NULL;
}
// Main loop
int main() {
// Get number of system threads
const int nthreads = sysconf(_SC_NPROCESSORS_ONLN);
// Open a new message queue
const char queue_name[] = "/test_queue";
mqd_t mqdes = mq_open(queue_name, O_CREAT | O_RDWR, 0644, NULL);
if (mqdes == (mqd_t)-1) {
perror("mq_open");
}
// Start threads
pthread_t *thread = (pthread_t *)malloc(nthreads * sizeof(pthread_t));
thread_info *tinfo = (thread_info *)malloc(nthreads * sizeof(thread_info));
for (int i = 0; i < nthreads; ++i) {
tinfo[i].tid = i;
tinfo[i].pmqdes = &mqdes;
if (pthread_create(&thread[i], NULL, &start_routine, &tinfo[i]) != 0) {
printf("Error!\n");
}
}
// Send messages to all threads via the queue
for (int i = 0; i < 100; ++i) {
char *text = NULL;
asprintf(&text, "Hello mqdes %d\n", i);
message *msg = message_new(100 - i, text);
free(text);
if (message_send(mqdes, msg, 1) != 0) {
perror("mq_send message");
}
message_free(msg);
}
// Send empty message to all threads to signal termination
for (int i = 0; i < nthreads; ++i) {
if (mq_send(mqdes, "", 0, 0) != 0) {
perror("mq_send terminate"); // mq_send sets errno
}
}
// Join all the threads
for (int i = 0; i < nthreads; ++i) {
pthread_join(thread[i], NULL);
}
free(thread);
free(tinfo);
// Make sure there are no messages left in the queue
struct mq_attr attr;
mq_getattr(mqdes, &attr);
assert(attr.mq_curmsgs == 0);
mq_close(mqdes);
mq_unlink(queue_name);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment