Skip to content

Instantly share code, notes, and snippets.

@fumiyas
Created March 18, 2018 08:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fumiyas/e3e9101e0aa4d9df5706ca3aafcb7e5d to your computer and use it in GitHub Desktop.
Save fumiyas/e3e9101e0aa4d9df5706ca3aafcb7e5d to your computer and use it in GitHub Desktop.
ZeroMQ server to pull and output strings
/*
ZeroMQ server to pull and output strings
Copyright (C) 2018 SATOH Fumiyasu @ OSS Technology Corp., Japan
License: GNU General Public License version 3
*/
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <zmq.h>
#ifndef TRUE
# define TRUE 1
#endif
#ifndef FALSE
# define FALSE 0
#endif
const char *progname;
void pinfo(const char *fmt, ...)
{
va_list ap;
fprintf(stderr, "%s: INFO: ", progname);
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
}
void perr(const char *fmt, ...)
{
va_list ap;
fprintf(stderr, "%s: ERROR: ", progname);
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
}
void pdie(int rc, const char *fmt, ...)
{
va_list ap;
fprintf(stderr, "%s: ERROR: ", progname);
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
exit(rc);
}
int signal_pipe[2];
static void signal_push(int signum)
{
char sigchar = signum;
pinfo("Caught a signal: %d\n", signum);
write(signal_pipe[1], &sigchar, 1);
}
int main(int argc, const char **argv)
{
const char *outfile;
FILE *out = stdout;
struct sigaction sa;
int term_p = FALSE;
int err, len, rc = 0;
const char *qsock_port;
void *qctx, *qsock;
char qsock_endpoint[1024];
size_t qsock_endpoint_size = sizeof(qsock_endpoint);
zmq_pollitem_t pollsocks[2];
zmq_msg_t msg;
progname = argv[0];
memset(&sa, 0, sizeof(sa));
sa.sa_handler = signal_push;
sa.sa_flags = SA_RESTART;
sigaction(SIGHUP, &sa, NULL);
sigaction(SIGINT, &sa, NULL);
sigaction(SIGTERM, &sa, NULL);
if (argc < 2 || argc > 3) {
printf("Usage: %s <PORT> [FILE]\n", progname);
exit(1);
}
qsock_port = argv[1];
outfile = (argc >= 2) ? argv[2] : NULL;
out = fopen(outfile, "a+");
if (out == NULL) {
pdie(1, "Cannot open output file: %s\n", strerror(errno));
}
if (pipe(signal_pipe) == -1) {
pdie(1, "pipe() failed", strerror(errno));
}
qctx = zmq_ctx_new();
if (qctx == NULL) {
pdie(1, "zmq_ctx_new() failed: %s\n", strerror(errno));
}
qsock = zmq_socket(qctx, ZMQ_PULL);
if (qsock == NULL) {
pdie(1, "zmq_socket() failed: %s\n", strerror(errno));
}
err = zmq_bind(qsock, qsock_port);
if (err) {
pdie(1, "zmq_bind() failed: %s\n", strerror(errno));
}
err = zmq_getsockopt(qsock, ZMQ_LAST_ENDPOINT, qsock_endpoint, &qsock_endpoint_size);
if (err) {
pdie(1, "zmq_getsockopt() failed: %s\n", strerror(errno));
}
pollsocks[0].socket = NULL;
pollsocks[0].fd = signal_pipe[0];
pollsocks[0].events = ZMQ_POLLIN;
pollsocks[1].socket = qsock;
pollsocks[1].events = ZMQ_POLLIN;
while (1) {
retry_poll:
len = zmq_poll(pollsocks, 2, term_p ? 0 : -1);
if (len == -1) {
if (errno == EINTR) {
goto retry_poll;
}
perr("zmq_poll() failed: %s\n", strerror(errno));
rc = 1;
break;
} else if (len == 0) {
/* EOF */
break;
}
if (pollsocks[0].revents & ZMQ_POLLIN) {
char sigchar;
int signum;
retry_sigread:
len = read(signal_pipe[0], &sigchar, 1);
if (len == -1) {
if (errno == EINTR) {
goto retry_sigread;
}
perr("read() failed: %s\n", strerror(errno));
rc = 1;
break;
}
signum = sigchar;
switch (signum) {
case SIGHUP: {
FILE *out_new;
if (outfile == NULL) {
/* No output file specified */
break;
}
pinfo("Reopening output file: %s\n", outfile);
out_new = fopen(outfile, "a+");
if (out_new == NULL) {
perr("Cannot open output file: %s\n", strerror(errno));
break;
}
fclose(out);
out = out_new;
break;
}
case SIGINT:
case SIGTERM:
if (term_p) {
/* Already prepared to terminate */
break;
}
pinfo("Unbinding ZMQ endpoint: %s\n", qsock_endpoint);
err = zmq_unbind(qsock, qsock_endpoint);
if (err) {
perr("zmq_unbind() failed: %s\n", strerror(errno));
rc = 1;
break;
}
term_p = TRUE;
break;
default:
perr("Unexpected signal: %d\n", signum);
}
continue;
}
(void)zmq_msg_init(&msg);
retry_msg_recv:
len = zmq_msg_recv(&msg, qsock, 0);
if (len == -1) {
if (errno == EINTR) {
goto retry_msg_recv;
}
perr("zmq_msg_send() failed: %s\n", strerror(errno));
rc = 1;
break;
}
/* FIXME: Check failure */
fputs((const char *)zmq_msg_data(&msg), out);
/* FIXME: Check failure */
fputc('\n', out);
/* FIXME: Check failure */
fflush(out);
err = zmq_msg_close(&msg);
if (err) {
perr("zmq_msg_close() failed: %s\n", strerror(errno));
rc = 1;
break;
}
}
zmq_close(qsock);
zmq_ctx_destroy(qctx);
return rc;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment