Skip to content

Instantly share code, notes, and snippets.

@claws
Last active November 24, 2016 12:35
Show Gist options
  • Save claws/7188924 to your computer and use it in GitHub Desktop.
Save claws/7188924 to your computer and use it in GitHub Desktop.
A candidate CZMQ implementation for a ZMQ socket event monitor.
// Monitor socket transport events (tcp and ipc only)
#include "../include/czmq.h"
// remove this when zsockmon.h is properly moved into ../include
// and added to ../include/czmq.h (via adding to model/project.xml)
#include "zsockmon.h"
// Structure of our class
struct _zsockmon_t {
zctx_t *ctx; // Private 0MQ context
void *pipe; // Pipe through to backend agent
};
// Background task does the real I/O
static void
s_agent_task (void *args, zctx_t *ctx, void *pipe);
// --------------------------------------------------------------------------
// Create a new socket monitor
zsockmon_t *
zsockmon_new (zctx_t *ctx, void *socket, int events)
{
zsockmon_t *self = (zsockmon_t *) zmalloc (sizeof (zsockmon_t));
// register a monitor endpoint on the socket
char *monitor_endpoint = (char *) zmalloc (100);
memset(monitor_endpoint, 0, 100);
sprintf (monitor_endpoint, "inproc://zsockmon-%p", socket);
int rc = zmq_socket_monitor (socket, monitor_endpoint, events);
assert (rc == 0);
// Start background agent to connect to the inproc monitor socket
self->pipe = zthread_fork (ctx, s_agent_task, NULL);
// Configure backend agent with monitor endpoint
zstr_send (self->pipe, "%s", monitor_endpoint);
free (monitor_endpoint);
char *status = zstr_recv (self->pipe);
if (strneq (status, "OK"))
zsockmon_destroy (&self);
free (status);
return self;
}
// --------------------------------------------------------------------------
// Destructor
void
zsockmon_destroy (zsockmon_t **self_p)
{
assert (self_p);
if (*self_p) {
zsockmon_t *self = *self_p;
zstr_send (self->pipe, "TERMINATE");
free (zstr_recv (self->pipe));
free (self);
*self_p = NULL;
}
}
// --------------------------------------------------------------------------
// Get the ZeroMQ socket, for polling or receiving socket
// event messages from the backend agent.
void *
zsockmon_socket (zsockmon_t *self)
{
assert (self);
return self->pipe;
}
// --------------------------------------------------------------------------
// Enable verbose tracing of commands and activity
void
zsockmon_set_verbose (zsockmon_t *self, bool verbose)
{
assert (self);
zstr_sendm (self->pipe, "VERBOSE");
zstr_send (self->pipe, "%d", verbose);
}
// --------------------------------------------------------------------------
// Self test of this class
static bool
s_check_event(void *s, int expected_event, bool verbose)
{
zpoller_t *poller = zpoller_new (s, NULL);
void *result = zpoller_wait (poller, 500);
assert (result);
assert (result == s);
zmsg_t *msg = zmsg_recv (s);
char *evstr = zmsg_popstr (msg);
int actual_event = atoi (evstr);
free (evstr);
zmsg_destroy (&msg);
zpoller_destroy (&poller);
if (verbose)
printf("expected event (%d) == (%d) actual event: %s\n",
expected_event, actual_event,
actual_event == expected_event ? "PASS" : "FAIL");
return actual_event == expected_event;
}
void
zsockmon_test (bool verbose)
{
printf (" * zsockmon: ");
if (verbose)
printf("\n");
bool result;
zctx_t *ctx = zctx_new ();
void *sink = zsocket_new (ctx, ZMQ_PULL);
zsockmon_t *sinkmon = zsockmon_new (ctx,
sink, ZMQ_EVENT_LISTENING | ZMQ_EVENT_ACCEPTED);
zsockmon_set_verbose (sinkmon, verbose);
void *sinkmon_sock = zsockmon_socket (sinkmon);
// check sink is now listening
zsocket_bind (sink, "tcp://*:5555");
result = s_check_event (sinkmon_sock, ZMQ_EVENT_LISTENING, verbose);
if (verbose)
printf("bind - %s\n", result ? "PASS" : "FAIL");
assert (result);
void *source = zsocket_new (ctx, ZMQ_PUSH);
zsockmon_t *sourcemon = zsockmon_new (ctx,
source, ZMQ_EVENT_CONNECTED | ZMQ_EVENT_DISCONNECTED);
zsockmon_set_verbose (sourcemon, verbose);
void *sourcemon_sock = zsockmon_socket (sourcemon);
zsocket_connect (source, "tcp://localhost:5555");
// check source connected to sink
result = s_check_event (sourcemon_sock, ZMQ_EVENT_CONNECTED, verbose);
if (verbose)
printf("connect - %s\n", result ? "PASS" : "FAIL");
assert (result);
// confirm sink accepted connection
result = s_check_event (sinkmon_sock, ZMQ_EVENT_ACCEPTED, verbose);
if (verbose)
printf("accepted - %s\n", result ? "PASS" : "FAIL");
assert (result);
// destroy sink to trigger a disconnect event on the source
zsocket_destroy (ctx, sink);
result = s_check_event (sourcemon_sock, ZMQ_EVENT_DISCONNECTED, verbose);
if (verbose)
printf("disconnect - %s\n", result ? "PASS" : "FAIL");
assert (result);
zsockmon_destroy (&sinkmon);
zsockmon_destroy (&sourcemon);
zctx_destroy (&ctx);
}
// --------------------------------------------------------------------------
// Backend agent implementation
// Agent instance
typedef struct {
zctx_t *ctx;
void *pipe; // Socket back to application
void *mon; // monitor socket
char *endpoint; // monitor endpoint
bool verbose; // Trace activity to stdout
bool terminated;
} agent_t;
// Prototypes for local functions we use in the agent
static agent_t *
s_agent_new (zctx_t *ctx, void *pipe, char *endpoint);
static void
s_api_command (agent_t *self);
static void
s_event_recv (agent_t *self);
static void
s_agent_destroy (agent_t **self_p);
// This is the background task that monitors socket events
static void
s_agent_task (void *args, zctx_t *ctx, void *pipe)
{
// read endpoint sent over pipe
char *endpoint = zstr_recv (pipe);
assert (endpoint);
// Create agent instance
agent_t *self = s_agent_new (ctx, pipe, endpoint);
zpoller_t *poller = zpoller_new (self->pipe, self->mon, NULL);
while (!zctx_interrupted) {
// Poll on API pipe and on monitor socket
void *result = zpoller_wait (poller, -1);
if (result == NULL)
break; // Interrupted
if (result == self->pipe)
s_api_command (self);
if (result == self->mon)
s_event_recv (self);
if (self->terminated)
break;
}
zpoller_destroy (&poller);
s_agent_destroy (&self);
}
// --------------------------------------------------------------------------
// Create and initialize new agent instance
static agent_t *
s_agent_new (zctx_t *ctx, void *pipe, char *endpoint)
{
agent_t *self = (agent_t *) malloc (sizeof (agent_t));
assert (self);
self->ctx = ctx;
self->pipe = pipe;
self->endpoint = endpoint;
self->verbose = false;
self->terminated = false;
// connect to the socket monitor inproc endpoint
self->mon = zsocket_new (self->ctx, ZMQ_PAIR);
assert (self->mon);
if (zsocket_connect (self->mon, self->endpoint) == 0)
zstr_send (self->pipe, "OK");
else
zstr_send (self->pipe, "ERROR");
return self;
}
// --------------------------------------------------------------------------
// Handle command from API
static void
s_api_command (agent_t *self)
{
char *command = zstr_recv (self->pipe);
if (self->verbose)
printf ("I: received api command: %s\n", command);
if (streq (command, "TERMINATE")) {
self->terminated = true;
zstr_send (self->pipe, "OK");
}
else
if (streq (command, "VERBOSE")) {
char *verbose = zstr_recv (self->pipe);
self->verbose = *verbose == '1';
free (verbose);
}
else
printf ("E: unexpected API command '%s'\n", command);
free (command);
}
// --------------------------------------------------------------------------
// Handle event from socket monitor
static void
s_event_recv (agent_t *self)
{
zframe_t *frame;
zmq_event_t event;
char *description;
char addr[1025];
// extract event data into event struct
frame = zframe_recv (self->mon);
// extract id of the event as bitfield
memcpy (&(event.event),
zframe_data (frame),
sizeof (event.event));
// extract value which is either error code, fd or reconnect interval
memcpy (&(event.value),
zframe_data (frame) + sizeof (event.event),
sizeof (event.value));
zframe_destroy (&frame);
// copy address part
frame = zframe_recv (self->mon);
memcpy (addr, zframe_data (frame), zframe_size (frame));
*(addr + zframe_size (frame)) = '\0'; // add null terminator to address string
zframe_destroy (&frame);
switch (event.event) {
case ZMQ_EVENT_ACCEPTED:
description = "Accepted";
break;
case ZMQ_EVENT_ACCEPT_FAILED:
description = "Accept failed";
break;
case ZMQ_EVENT_BIND_FAILED:
description = "Bind failed";
break;
case ZMQ_EVENT_CLOSED:
description = "Closed";
break;
case ZMQ_EVENT_CLOSE_FAILED:
description = "Close failed";
break;
case ZMQ_EVENT_DISCONNECTED:
description = "Disconnected";
break;
case ZMQ_EVENT_CONNECTED:
description = "Connected";
break;
case ZMQ_EVENT_CONNECT_DELAYED:
description = "Connect delayed";
break;
case ZMQ_EVENT_CONNECT_RETRIED:
description = "Connect retried";
break;
case ZMQ_EVENT_LISTENING:
description = "Listening";
break;
case ZMQ_EVENT_MONITOR_STOPPED:
description = "Monitor stopped";
break;
default:
if (self->verbose)
printf ("Unknown socket monitor event: %d", event.value);
break;
}
if (self->verbose)
printf ("EV: %s - %s\n", description, addr);
zmsg_t *msg = zmsg_new();
zmsg_addstr (msg, "%d", (int) event.event );
zmsg_addstr (msg, "%d", (int) event.value );
zmsg_addstr (msg, "%s", addr);
zmsg_addstr (msg, "%s", description);
zmsg_send (&msg, self->pipe);
}
// --------------------------------------------------------------------------
// Destroy agent instance
static void
s_agent_destroy (agent_t **self_p)
{
assert (self_p);
if (*self_p) {
agent_t *self = *self_p;
free (self->endpoint);
free (self);
*self_p = NULL;
}
}
#ifndef __ZSOCKMON_H_INCLUDED__
#define __ZSOCKMON_H_INCLUDED__
#ifdef __cplusplus
extern "C" {
#endif
// Opaque class structure
typedef struct _zsockmon_t zsockmon_t;
// @interface
// Create a new socket monitor
CZMQ_EXPORT zsockmon_t *
zsockmon_new (zctx_t *ctx, void *socket, int events);
// Destroy a beacon
CZMQ_EXPORT void
zsockmon_destroy (zsockmon_t **self_p);
// Get zsockmon pipe socket, for polling or receiving messages
CZMQ_EXPORT void *
zsockmon_socket (zsockmon_t *self);
//
CZMQ_EXPORT void
zsockmon_set_verbose (zsockmon_t *self, bool verbose);
// Self test of this class
CZMQ_EXPORT void
zsockmon_test (bool verbose);
// @end
#ifdef __cplusplus
}
#endif
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment