Skip to content

Instantly share code, notes, and snippets.

@danielinux
Created June 22, 2013 13:21
Show Gist options
  • Save danielinux/5840841 to your computer and use it in GitHub Desktop.
Save danielinux/5840841 to your computer and use it in GitHub Desktop.
ZeroMQ simplest publisher example using PicoTCP
#include <stdint.h>
#include "pico_socket.h"
enum zmq_hshake_state {
ST_LISTEN = 0,
ST_CONNECTED,
ST_SIGNATURE,
ST_VERSION,
ST_GREETING,
ST_RDY
} Handshake_state = ST_LISTEN;
static void hs_connected(struct pico_socket *s)
{
uint8_t my_signature[10] = {0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f};
pico_socket_write(s, my_signature, 10);
Handshake_state = ST_SIGNATURE;
}
static void hs_signature(struct pico_socket *s)
{
uint8_t incoming[20];
int ret;
uint8_t my_ver[2] = {3u, 0};
ret = pico_socket_read(s, incoming, 10);
if (ret < 10) {
printf("Received invalid signature\n");
pico_socket_close(s);
Handshake_state = ST_LISTEN;
return;
}
if (incoming[0] != 0xFF) {
printf("Received invalid signature\n");
pico_socket_close(s);
Handshake_state = ST_LISTEN;
return;
}
pico_socket_write(s, my_ver, 2);
Handshake_state = ST_VERSION;
}
static void hs_version(struct pico_socket *s)
{
uint8_t incoming[20];
int ret;
uint8_t my_greeting[53] = {'N','U','L','L', 0};
ret = pico_socket_read(s, incoming, 2);
if (ret != 2) {
printf("Cannot exchange valid version information\n");
pico_socket_close(s);
Handshake_state = ST_LISTEN;
return;
}
if (incoming[0] != 3) {
printf("Version %d.%d not supported by this publisher\n", incoming[0], incoming[1]);
pico_socket_close(s);
Handshake_state = ST_LISTEN;
return;
}
pico_socket_write(s, my_greeting, 53);
Handshake_state = ST_GREETING;
}
static void hs_greeting(struct pico_socket *s)
{
uint8_t incoming[53];
int ret;
uint8_t my_rdy[8] = {'R','E','A','D','Y',' ',' ',' '};
ret = pico_socket_read(s, incoming, 53);
if (ret != 53) {
printf("Cannot retrieve valid greeting\n");
pico_socket_close(s);
Handshake_state = ST_LISTEN;
return;
}
pico_socket_write(s, my_rdy, 8);
Handshake_state = ST_RDY;
}
static void(*hs_cb[])(struct pico_socket *) = {
NULL,
hs_connected,
hs_signature,
hs_version,
hs_greeting,
NULL
};
void cb_tcp0mq(uint16_t ev, struct pico_socket *s)
{
struct pico_ip4 orig;
uint16_t port;
char peer[30];
if (ev & PICO_SOCK_EV_RD) {
if (hs_cb[Handshake_state])
hs_cb[Handshake_state](s);
}
if (ev & PICO_SOCK_EV_CONN) {
pico_socket_accept(s, &orig, &port);
pico_ipv4_to_string(peer, orig.addr);
printf("tcp0mq> Connection established with %s:%d.\n", peer, short_be(port));
Handshake_state = ST_CONNECTED;
}
if (ev & PICO_SOCK_EV_FIN) {
printf("tcp0mq> Connection closed.\n");
Handshake_state = ST_LISTEN;
}
if (ev & PICO_SOCK_EV_ERR) {
printf("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err));
printf("tcp0mq> Connection closed.\n");
Handshake_state = ST_LISTEN;
}
if (ev & PICO_SOCK_EV_CLOSE) {
printf("tcp0mq> event close\n");
pico_socket_close(s);
Handshake_state = ST_LISTEN;
}
if (ev & PICO_SOCK_EV_WR) {
/* TODO: manage pending data */
}
}
void app_tcp0mq(char *arg)
{
struct pico_socket *s;
struct pico_ip4 server_addr;
uint16_t port = short_be(9000);
struct pico_ip4 inaddr_any = {0};
s = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq);
if (!s)
exit(1);
printf("tcp0mq> BIND\n");
if (pico_socket_bind(s, &inaddr_any, &port)!= 0) {
printf("tcp0mq> BIND failed because %s\n", strerror(pico_err));
exit(1);
}
printf("tcp0mq> LISTEN\n");
if (pico_socket_listen(s, 40) != 0)
exit(1);
printf("tcp0mq> listening port %u ...\n",short_be(port));
while(1) {
pico_stack_tick();
usleep(1000);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment