Skip to content

Instantly share code, notes, and snippets.

@hintjens
Last active December 18, 2015 19:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hintjens/5830173 to your computer and use it in GitHub Desktop.
Save hintjens/5830173 to your computer and use it in GitHub Desktop.
ZMTP 3.0 subscriber
//
// ZMTP 3.0 subscriber proof-of-concept
// Implements http://rfc.zeromq.org/spec:23 with NULL mechanism
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <assert.h>
#include <fcntl.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
typedef struct {
uint8_t flags; // Must be zero
uint8_t size; // Size, 0 to 255 uint8_ts
uint8_t data [255]; // Message data
} zmtp_msg_t;
static void
derp (char *s)
{
perror (s);
exit (1);
}
static void
tcp_send (int handle, void *buffer, size_t len)
{
if (send (handle, buffer, len, 0) == -1)
derp ("send");
}
static void
tcp_recv (int handle, void *buffer, size_t len)
{
size_t len_recd = 0;
while (len_recd < len) {
ssize_t uint8_ts = recv (handle, buffer + len_recd, len - len_recd, 0);
if (uint8_ts == -1)
derp ("recv");
len_recd += uint8_ts;
}
}
static void
zmtp_recv (int handle, zmtp_msg_t *msg)
{
tcp_recv (handle, (uint8_t *) msg, 2);
tcp_recv (handle, msg->data, msg->size);
}
static void
zmtp_send (int handle, zmtp_msg_t *msg)
{
tcp_send (handle, (uint8_t *) msg, msg->size + 2);
}
// This is the 3.0 greeting (64 uint8_ts)
typedef struct {
uint8_t signature [10];
uint8_t version [2];
uint8_t mechanism [20];
uint8_t as_server [1];
uint8_t filler [31];
} zmtp_greeting_t;
int main (void)
{
puts ("I: starting subscriber");
// Create TCP socket
int peer;
if ((peer = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1)
derp ("socket");
// Connect to publisher
struct sockaddr_in si_peer = { 0 };
si_peer.sin_family = AF_INET;
si_peer.sin_port = htons (9000);
si_peer.sin_addr.s_addr = inet_addr ("127.0.0.1");
// Keep trying to connect until we succeed
while (connect (peer, (const struct sockaddr *) &si_peer, sizeof (si_peer)) == -1)
sleep (1);
// This is our greeting (64 octets)
zmtp_greeting_t outgoing = {
{ 0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F },
{ 3, 0 },
{ 'N', 'U', 'L', 'L', 0 },
{ 0 },
{ 0 }
};
// Do full backwards version detection following RFC23
// Send first ten uint8_ts of greeting to peer
tcp_send (peer, &outgoing, 10);
// Read first uint8_t from peer
zmtp_greeting_t incoming;
tcp_recv (peer, &incoming, 1);
uint8_t length = incoming.signature [0];
if (length == 0xFF) {
// Looks like 2.0+, read 9 more uint8_ts to be sure
tcp_recv (peer, (uint8_t *) &incoming + 1, 9);
if ((incoming.signature [9] & 1) == 1) {
// Exchange major version numbers
tcp_send (peer, (uint8_t *) &outgoing + 10, 1);
tcp_recv (peer, (uint8_t *) &incoming + 10, 1);
tcp_send (peer, (uint8_t *) &outgoing + 11, 53);
tcp_recv (peer, (uint8_t *) &incoming + 11, 53);
// Do NULL handshake - send READY command
// For now, empty dictionary
zmtp_msg_t ready = { 0x04, 8 };
memcpy (ready.data, "READY ", 8);
zmtp_send (peer, &ready);
// Now wait for peer's READY command
zmtp_recv (peer, &ready);
// Command bit isn't set in libzmq
// assert (ready.flags == 0x04);
assert (memcmp (ready.data, "READY ", 8) == 0);
puts ("I: NULL security handshake completed");
}
}
size_t count = 0;
// Send subscription to peer
puts ("I: subscribing to test data stream");
zmtp_msg_t msg = { 0 };
msg.size = 1;
msg.data [0] = 1; // SUBSCRIBE
zmtp_send (peer, &msg);
// Get broadcast until it's done
while (true) {
zmtp_recv (peer, &msg);
if (msg.size == 5 && memcmp (msg.data, "WORLD", 5) == 0)
break; // Finished
count++;
}
printf ("I: %zd messages received\n", count);
close (peer);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment