Last active
May 17, 2022 12:24
-
-
Save hmenke/83bc7a26c0f5f3e0867f4e5a9573e182 to your computer and use it in GitHub Desktop.
Example of POSIX mqueue to communicate with pthreads.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef __cplusplus | |
#define _GNU_SOURCE | |
#endif | |
// C standard library | |
#include <assert.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
// POSIX C library | |
#include <mqueue.h> | |
#include <pthread.h> | |
#include <unistd.h> | |
// Serialization | |
typedef struct _message message; | |
struct _message { | |
int id; | |
char *text; | |
}; | |
message *message_new(int id, const char *text) { | |
message *msg = (message *)malloc(sizeof(message)); | |
msg->id = id; | |
msg->text = text ? strdup(text) : NULL; | |
return msg; | |
} | |
int message_id(message *msg) { return msg->id; } | |
char *message_text(message *msg) { return msg->text; } | |
void message_free(message *msg) { | |
free(msg->text); | |
free(msg); | |
msg = NULL; | |
} | |
typedef struct _archive archive; | |
struct _archive{ | |
size_t size; | |
char *data; | |
}; | |
archive *archive_new(size_t size, const char *data) { | |
archive *ar = (archive *)malloc(sizeof(archive)); | |
ar->data = NULL; | |
ar->size = size; | |
if (size > 0) { | |
memcpy(ar->data, data, size); | |
} | |
return ar; | |
} | |
size_t archive_size(archive *ar) { return ar->size; } | |
char *archive_data(archive *ar) { return ar->data; } | |
void archive_free(archive *ar) { | |
free(ar->data); | |
free(ar); | |
ar = NULL; | |
} | |
archive *message_serialize(const message *msg) { | |
archive *ar = archive_new(0, NULL); | |
size_t len = strlen(msg->text) + 1; | |
ar->size = sizeof(int) + len * sizeof(char); | |
ar->data = (char *)malloc(ar->size); | |
size_t offset = 0; | |
memcpy(ar->data + offset, &msg->id, sizeof(int)); | |
offset += sizeof(int); | |
memcpy(ar->data + offset, msg->text, len * sizeof(char)); | |
return ar; | |
} | |
message *message_unserialize(const char *buffer) { | |
message *msg = message_new(0, NULL); | |
size_t offset = 0; | |
memcpy(&msg->id, buffer + offset, sizeof(int)); | |
offset += sizeof(int); | |
// I hope you serialized that null terminator | |
msg->text = strdup(buffer + offset); | |
return msg; | |
} | |
int message_send(mqd_t mqdes, const message *msg, unsigned int msg_prio) { | |
archive *ar = message_serialize(msg); | |
int ret = mq_send(mqdes, archive_data(ar), archive_size(ar), msg_prio); | |
archive_free(ar); | |
return ret; | |
} | |
message *message_receive(mqd_t mqdes, char *buffer, size_t size) { | |
struct mq_attr mqstat; | |
if (size == 0) { | |
mq_getattr(mqdes, &mqstat); | |
size = mqstat.mq_msgsize; | |
buffer = (char *)malloc(size * sizeof(char)); | |
} | |
message *msg = NULL; | |
switch (mq_receive(mqdes, buffer, size, NULL)) { | |
case 0: | |
// zero-length message means termination | |
break; | |
case -1: | |
perror("mq_receive"); | |
break; | |
default: | |
msg = message_unserialize(buffer); | |
break; | |
} | |
if (size == 0) { | |
free(buffer); | |
} | |
return msg; | |
} | |
// Thread | |
typedef struct { | |
int tid; | |
mqd_t *pmqdes; | |
} thread_info; | |
static void *start_routine(void *arg) { | |
thread_info const *const tinfo = (thread_info *)arg; | |
mqd_t mqdes = *(tinfo->pmqdes); | |
struct mq_attr mqstat; | |
mq_getattr(mqdes, &mqstat); | |
char *buffer = (char *)malloc(mqstat.mq_msgsize * sizeof(char)); | |
message *msg; | |
while ((msg = message_receive(mqdes, buffer, mqstat.mq_msgsize)) != NULL) { | |
printf("%d [%ld] %d, %s", tinfo->tid, | |
strlen(message_text(msg)), message_id(msg), message_text(msg)); | |
usleep(10000); // Placeholder for long work | |
message_free(msg); | |
} | |
free(buffer); | |
return NULL; | |
} | |
// Main loop | |
int main() { | |
// Get number of system threads | |
const int nthreads = sysconf(_SC_NPROCESSORS_ONLN); | |
// Open a new message queue | |
const char queue_name[] = "/test_queue"; | |
mqd_t mqdes = mq_open(queue_name, O_CREAT | O_RDWR, 0644, NULL); | |
if (mqdes == (mqd_t)-1) { | |
perror("mq_open"); | |
} | |
// Start threads | |
pthread_t *thread = (pthread_t *)malloc(nthreads * sizeof(pthread_t)); | |
thread_info *tinfo = (thread_info *)malloc(nthreads * sizeof(thread_info)); | |
for (int i = 0; i < nthreads; ++i) { | |
tinfo[i].tid = i; | |
tinfo[i].pmqdes = &mqdes; | |
if (pthread_create(&thread[i], NULL, &start_routine, &tinfo[i]) != 0) { | |
printf("Error!\n"); | |
} | |
} | |
// Send messages to all threads via the queue | |
for (int i = 0; i < 100; ++i) { | |
char *text = NULL; | |
asprintf(&text, "Hello mqdes %d\n", i); | |
message *msg = message_new(100 - i, text); | |
free(text); | |
if (message_send(mqdes, msg, 1) != 0) { | |
perror("mq_send message"); | |
} | |
message_free(msg); | |
} | |
// Send empty message to all threads to signal termination | |
for (int i = 0; i < nthreads; ++i) { | |
if (mq_send(mqdes, "", 0, 0) != 0) { | |
perror("mq_send terminate"); // mq_send sets errno | |
} | |
} | |
// Join all the threads | |
for (int i = 0; i < nthreads; ++i) { | |
pthread_join(thread[i], NULL); | |
} | |
free(thread); | |
free(tinfo); | |
// Make sure there are no messages left in the queue | |
struct mq_attr attr; | |
mq_getattr(mqdes, &attr); | |
assert(attr.mq_curmsgs == 0); | |
mq_close(mqdes); | |
mq_unlink(queue_name); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment