Last active
December 18, 2015 19:00
-
-
Save hintjens/5830173 to your computer and use it in GitHub Desktop.
ZMTP 3.0 subscriber
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// ZMTP 3.0 subscriber proof-of-concept | |
// Implements http://rfc.zeromq.org/spec:23 with NULL mechanism | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <stdbool.h> | |
#include <string.h> | |
#include <assert.h> | |
#include <fcntl.h> | |
#include <unistd.h> | |
#include <netinet/in.h> | |
#include <arpa/inet.h> | |
typedef struct { | |
uint8_t flags; // Must be zero | |
uint8_t size; // Size, 0 to 255 uint8_ts | |
uint8_t data [255]; // Message data | |
} zmtp_msg_t; | |
static void | |
derp (char *s) | |
{ | |
perror (s); | |
exit (1); | |
} | |
static void | |
tcp_send (int handle, void *buffer, size_t len) | |
{ | |
if (send (handle, buffer, len, 0) == -1) | |
derp ("send"); | |
} | |
static void | |
tcp_recv (int handle, void *buffer, size_t len) | |
{ | |
size_t len_recd = 0; | |
while (len_recd < len) { | |
ssize_t uint8_ts = recv (handle, buffer + len_recd, len - len_recd, 0); | |
if (uint8_ts == -1) | |
derp ("recv"); | |
len_recd += uint8_ts; | |
} | |
} | |
static void | |
zmtp_recv (int handle, zmtp_msg_t *msg) | |
{ | |
tcp_recv (handle, (uint8_t *) msg, 2); | |
tcp_recv (handle, msg->data, msg->size); | |
} | |
static void | |
zmtp_send (int handle, zmtp_msg_t *msg) | |
{ | |
tcp_send (handle, (uint8_t *) msg, msg->size + 2); | |
} | |
// This is the 3.0 greeting (64 uint8_ts) | |
typedef struct { | |
uint8_t signature [10]; | |
uint8_t version [2]; | |
uint8_t mechanism [20]; | |
uint8_t as_server [1]; | |
uint8_t filler [31]; | |
} zmtp_greeting_t; | |
int main (void) | |
{ | |
puts ("I: starting subscriber"); | |
// Create TCP socket | |
int peer; | |
if ((peer = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) | |
derp ("socket"); | |
// Connect to publisher | |
struct sockaddr_in si_peer = { 0 }; | |
si_peer.sin_family = AF_INET; | |
si_peer.sin_port = htons (9000); | |
si_peer.sin_addr.s_addr = inet_addr ("127.0.0.1"); | |
// Keep trying to connect until we succeed | |
while (connect (peer, (const struct sockaddr *) &si_peer, sizeof (si_peer)) == -1) | |
sleep (1); | |
// This is our greeting (64 octets) | |
zmtp_greeting_t outgoing = { | |
{ 0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F }, | |
{ 3, 0 }, | |
{ 'N', 'U', 'L', 'L', 0 }, | |
{ 0 }, | |
{ 0 } | |
}; | |
// Do full backwards version detection following RFC23 | |
// Send first ten uint8_ts of greeting to peer | |
tcp_send (peer, &outgoing, 10); | |
// Read first uint8_t from peer | |
zmtp_greeting_t incoming; | |
tcp_recv (peer, &incoming, 1); | |
uint8_t length = incoming.signature [0]; | |
if (length == 0xFF) { | |
// Looks like 2.0+, read 9 more uint8_ts to be sure | |
tcp_recv (peer, (uint8_t *) &incoming + 1, 9); | |
if ((incoming.signature [9] & 1) == 1) { | |
// Exchange major version numbers | |
tcp_send (peer, (uint8_t *) &outgoing + 10, 1); | |
tcp_recv (peer, (uint8_t *) &incoming + 10, 1); | |
tcp_send (peer, (uint8_t *) &outgoing + 11, 53); | |
tcp_recv (peer, (uint8_t *) &incoming + 11, 53); | |
// Do NULL handshake - send READY command | |
// For now, empty dictionary | |
zmtp_msg_t ready = { 0x04, 8 }; | |
memcpy (ready.data, "READY ", 8); | |
zmtp_send (peer, &ready); | |
// Now wait for peer's READY command | |
zmtp_recv (peer, &ready); | |
// Command bit isn't set in libzmq | |
// assert (ready.flags == 0x04); | |
assert (memcmp (ready.data, "READY ", 8) == 0); | |
puts ("I: NULL security handshake completed"); | |
} | |
} | |
size_t count = 0; | |
// Send subscription to peer | |
puts ("I: subscribing to test data stream"); | |
zmtp_msg_t msg = { 0 }; | |
msg.size = 1; | |
msg.data [0] = 1; // SUBSCRIBE | |
zmtp_send (peer, &msg); | |
// Get broadcast until it's done | |
while (true) { | |
zmtp_recv (peer, &msg); | |
if (msg.size == 5 && memcmp (msg.data, "WORLD", 5) == 0) | |
break; // Finished | |
count++; | |
} | |
printf ("I: %zd messages received\n", count); | |
close (peer); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment