Skip to content

Instantly share code, notes, and snippets.

@mrryanjohnston
Last active February 19, 2021 05:27
Show Gist options
  • Save mrryanjohnston/3a619ca356d69a9026dd75d23da6e51e to your computer and use it in GitHub Desktop.
Save mrryanjohnston/3a619ca356d69a9026dd75d23da6e51e to your computer and use it in GitHub Desktop.
ezpzPubSub

ezpzPubSub

A small, POSIX compliant(????) PubSub.

Warning

I don't know what I'm doing as far as c programming goes, so be careful!

Trying stuff out

make

Then, start a server that will listen to messages that it should broadcast to subscribers:

./server

Next, add two (or however many you want) subscribers:

./subscribe

Finally, we can publish events to subscribers:

./publish "Hello, world"

If you ctrl + c the subscriber terminal, it'll return its old PID. You can "re-subscribe" to the mqueue for that subscriber by using the resubscribe <pid> command:

./resubscribe 1337

If you happen to do another publish while a subscriber process is not running, there will still be a /dev/mqueue/pubsub1337 file. The ./resubscribe program "reconnects" you to this queue.

If you no longer want to subscribe to this mqueue and you want the queue to be deleted, you can either do rm /dev/mqueue/pubsub1337 or you can use ./unsubscribe:

./unsubscribe 1337

Development

I run this after a make cleean to start from a blank slate:

rm /dev/mqueue/pubsub*
all: publish resubscribe server subscribe unsubscribe
publish:
gcc publish.c -lrt -o publish
resubscribe:
gcc resubscribe.c -lrt -o resubscribe
server:
gcc server.c -lpthread -lrt -o server
subscribe:
gcc subscribe.c -lrt -o subscribe
unsubscribe:
gcc unsubscribe.c -lrt -o unsubscribe
clean:
rm publish resubscribe server subscribe unsubscribe
#include <mqueue.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
const char *QUEUE_NAME = "/pubsub";
int main(int argc, char *argv []) {
if (argc != 2) {
printf("Error: You must specify 1 argument to publish\n");
exit(1);
}
mqd_t mqd = mq_open(QUEUE_NAME, O_WRONLY);
if (mq_send(mqd, argv[1], strlen(argv[1]) + 1, 0) != 0){
perror("Error publishing message");
return 1;
}
mq_close(mqd);
return 0;
}
#include <mqueue.h>
#include <signal.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
const char *QUEUE_NAME = "/pubsub";
const char *SUBSCRIBER_PATTERN = "/pubsub%s";
volatile sig_atomic_t keep_going = 1;
volatile sig_atomic_t fatal_error_in_progress = 0;
void fatal_error_signal (int sig) {
if (fatal_error_in_progress)
raise (sig);
fatal_error_in_progress = 1;
fprintf(stderr, "\nDEBUG: SIGINT (ctrl+c) received. Exiting program...\n");
pid_t pid = getpid();
signal (sig, SIG_DFL);
raise (sig);
}
void main(int argc, char *argv[]) {
if (argc != 2) {
printf("You must provide the PID provided to you after your subscription closed.\n");
printf("Usage:\n");
printf(" resubscribe 3240\n");
exit(1);
}
signal (SIGINT, fatal_error_signal);
char mq[21];
sprintf(mq, SUBSCRIBER_PATTERN, argv[1]);
// This message queue
// can be mq_receive d upon
// in order to get events published to subscribers
mqd_t mqd = mq_open(mq, O_RDONLY, 0666, NULL);
if (mqd == -1) {
perror("Error re-subscribing to mqueue");
exit(1);
}
char msg_ptr[8192];
while(1) {
if (mq_receive(mqd, msg_ptr, 8193, NULL) != -1) {
printf("DEBUG: Message received from publish event: %s\n", msg_ptr);
}
}
}
#include <dirent.h>
#include <mqueue.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
const char *QUEUE_NAME = "/pubsub";
const char *SUBSCRIBER_PATTERN = "/pubsub%ld";
void main() {
// This message queue
// can be mq_receive d upon
// in order to get events that should be published
struct mq_attr attr;
mqd_t mqd = mq_open(QUEUE_NAME, O_CREAT | O_RDONLY, 0666, NULL);
if (mqd == -1) {
perror("Error creating broadcasting mqueue");
exit(1);
}
char mq[257];
char msg_ptr[8192];
DIR *dir;
struct dirent *entry;
mqd_t smqd;
while(1) {
if (mq_receive(mqd, msg_ptr, 8193, NULL) != -1) {
printf("DEBUG: Message to broadcast: %s\n", msg_ptr);
if ((dir = opendir("/dev/mqueue")) == NULL) {
perror("opendir() error");
exit(1);
} else {
while ((entry = readdir(dir)) != NULL) {
sprintf(mq, "/%s", entry->d_name);
if (strcmp(entry->d_name, "") != 0 && strcmp(mq, QUEUE_NAME) != 0 && strcmp(entry->d_name, ".") != 0 && (strcmp(entry->d_name, "..") != 0)) {
printf("DEBUG: broadcasting to subscriber %s\n", mq);
// This message queue
// can be mq_send d upon
// in order to send events to subscriber queues
smqd = mq_open(mq, O_WRONLY);
if (smqd == -1) {
perror("WARNING: could not connect to subscriber mqueue");
}
if (mq_send(smqd, msg_ptr, strlen(msg_ptr) + 1, 0) != 0){
perror("WARNING: could not publish message to subscriber mqueue");
}
mq_close(smqd);
}
}
closedir(dir);
}
}
}
}
#include <mqueue.h>
#include <signal.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
const char *QUEUE_NAME = "/pubsub";
const char *SUBSCRIBER_PATTERN = "/pubsub%ld";
volatile sig_atomic_t keep_going = 1;
volatile sig_atomic_t fatal_error_in_progress = 0;
void fatal_error_signal (int sig) {
if (fatal_error_in_progress)
raise (sig);
fatal_error_in_progress = 1;
fprintf(stderr, "\nDEBUG: SIGINT (ctrl+c) received. Exiting program...\n");
pid_t pid = getpid();
fprintf(stderr, "DEBUG: Re-subscribe with `resubscribe %d`", pid);
signal (sig, SIG_DFL);
raise (sig);
}
void main() {
signal (SIGINT, fatal_error_signal);
pid_t pid = getpid();
char mq[21];
sprintf(mq, SUBSCRIBER_PATTERN, pid);
// This message queue
// can be mq_receive d upon
// in order to get events published to subscribers
mqd_t mqd = mq_open(mq, O_CREAT | O_EXCL | O_RDONLY, 0666, NULL);
if (mqd == -1) {
perror("Error creating subscriber mqueue");
exit(1);
}
char msg_ptr[8192];
while(1) {
if (mq_receive(mqd, msg_ptr, 8193, NULL) != -1) {
printf("DEBUG: Message received from publish event: %s\n", msg_ptr);
}
}
}
#include <mqueue.h>
#include <signal.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
const char *QUEUE_NAME = "/pubsub";
const char *SUBSCRIBER_PATTERN = "/pubsub%s";
volatile sig_atomic_t keep_going = 1;
volatile sig_atomic_t fatal_error_in_progress = 0;
void fatal_error_signal (int sig) {
if (fatal_error_in_progress)
raise (sig);
fatal_error_in_progress = 1;
fprintf(stderr, "\nDEBUG: SIGINT (ctrl+c) received. Exiting program...\n");
signal (sig, SIG_DFL);
raise (sig);
}
void main(int argc, char *argv[]) {
if (argc != 2) {
printf("You must provide the PID provided to you after your subscription closed.\n");
printf("Usage:\n");
printf(" unsubscribe 3240\n");
exit(1);
}
signal (SIGINT, fatal_error_signal);
char mq[21];
sprintf(mq, SUBSCRIBER_PATTERN, argv[1]);
mqd_t mqd = mq_open(mq, O_RDONLY, 0666, NULL);
if (mqd == -1) {
perror("Error unsubscribing from mqueue");
exit(1);
}
mq_close(mqd);
mq_unlink(mq);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment