Skip to content

Instantly share code, notes, and snippets.

@nicknezis
Last active April 30, 2018 03:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nicknezis/184a4702ea6c3416edc4bc7dc592f932 to your computer and use it in GitHub Desktop.
Save nicknezis/184a4702ea6c3416edc4bc7dc592f932 to your computer and use it in GitHub Desktop.
Nats.io Waterslide

cnats:

mkdir build
cd build
cmake .. -DCMAKE_C_FLAGS="-fPIC"
sudo make install

Waterslide

export HASNATS=1
make -j
...
ifdef HASNATS
proc_nats_in$(WS_SFX): proc_nats_in.c
$(SHOWFILE)
$(CPP) $(CPPFLAGS) -I/usr/local/include $< -o $@ /usr/local/lib/libnats_static.a $(LDFLAGS)
$(INSTALL) $@ $(WS_PROCS_DIR)
else
proc_nats_in$(WS_SFX): proc_nats_in.c
@echo " not building $@, set HASNATS to build"
endif
ifdef HASNATS
proc_nats_out$(WS_SFX): proc_nats_out.c
$(SHOWFILE)
$(CPP) $(CPPFLAGS) -I/usr/local/include $< -o $@ /usr/local/lib/libnats_static.a $(LDFLAGS)
$(INSTALL) $@ $(WS_PROCS_DIR)
else
proc_nats_out$(WS_SFX): proc_nats_out.c
@echo " not building $@, set HASNATS to build"
endif
...
//catch buffers of tcp messages from a tcp thrower application..
// in terms of a client-server model, this is a client that connects to a
// server
#define PROC_NAME "nats_in"
//#define DEBUG 1
#include <stdlib.h>
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
// #include <stropts.h>
// #include <poll.h>
#include "metaproc.h"
#include "metaprocdata.h"
#include "procloader.h"
#include "sysutil.h"
#include "datatypes/mpdt_tuple.h"
#include "datatypes/mpdt_fixedstring.h"
#include "datatypes/mpdt_uint.h"
#include "datatypes/mpdt_uint64.h"
#include "datatypes/mpdt_double.h"
#include <nats/nats.h>
// #include "tcp_rw.h"
char proc_name[] = PROC_NAME;
char proc_version[] = "0.1";
char *proc_tags[] = {"source", "input", NULL};
char *proc_alias[] = { NULL };
char proc_purpose[] = "reads from gnatsd server, creates binary data for parsing";
char proc_description[] = "";
proc_option_t proc_opts[] = {
/* 'option character', "long option string", "option argument",
"option description", <allow multiple>, <required>*/
{'u',"","url",
"connection URL",0,0},
// {'p',"","port",
// "Use listening port",0,0},
// {'w',"","",
// "dont wait till data is available",0,0},
// {'b',"","",
// "Use blocking sockets (default: non-blocking)", 0,0},
//the following must be left as-is to signify the end of the array
{' ',"","",
"",0,0}
};
char proc_nonswitch_opts[] = "";
char *proc_input_types[] = {NULL};
// (Potential) Output types: bin
char *proc_output_types[] = {"any", NULL};
char proc_requires[] = "nats_out should be used in a separately running process";
// Ports:
proc_port_t proc_input_ports[] = {{NULL, NULL}};
char *proc_tuple_container_labels[] = {NULL};
char *proc_tuple_conditional_container_labels[] = {NULL};
char *proc_tuple_member_labels[] = {NULL};
// char *proc_synopsis[] = {"tcpcatch -h <hostname> -p <port> [-s] [-w] [-b]", NULL};
char *proc_synopsis[] = {"nats_in -u <url> -t <topic>", NULL};
proc_example_t proc_examples[] = {
{"nats_in -u 'nats://localhost:4222' -t foo.bar | ...", "subscribe to events and utilize normal event processing in the subsequent pipeline."},
{NULL, NULL}
};
//function prototypes for local functions
static int data_source(void *, mpdata_t*, mp_doutput_t*, int);
typedef struct _proc_instance_t {
natsConnection *nc;
natsSubscription *sub;
natsMsg *msg;
char *url;
char *topic;
uint64_t meta_process_cnt;
uint64_t loops;
mp_outtype_t * outtype_bin;
// tcp_catch_t * tcpc;
// char * hostname;
// int wait;
// uint16_t port;
// int blocking;
} proc_instance_t;
static int proc_cmd_options(int argc, char ** argv,
proc_instance_t * proc, void * type_table) {
int op;
// int server = 0;
while ((op = getopt(argc, argv, "u:t:")) != EOF) {
switch (op) {
case 'u':
proc->url = strdup(optarg);
break;
case 't':
proc->topic = strdup(optarg);
tool_print("Topic: %s", proc->topic);
break;
default:
return 0;
}
}
if (NULL == proc->url) {
return 0;
}
return 1;
}
// the following is a function to take in command arguments and initalize
// this processor's instance..
// also register as a source here..
// return 1 if ok
// return 0 if fail
int proc_init(mpkid_t * kid, int argc, char ** argv, void ** vinstance, mp_sourcev_t * sv,
void * type_table) {
//allocate proc instance of this processor
proc_instance_t * proc =
(proc_instance_t*)calloc(1,sizeof(proc_instance_t));
*vinstance = proc;
proc->loops = 0;
proc->meta_process_cnt = 0;
//read in command options
if (!proc_cmd_options(argc, argv, proc, type_table)) {
return 0;
}
proc->outtype_bin =
mp_register_source_byname(type_table, "BINARY_TYPE", data_source, sv);
natsStatus s;
// s = natsOptions_SetErrorHandler(opts, asyncCb, NULL);
// if (s == NATS_OK)
// Connects to the NATS Server
s = natsConnection_ConnectTo(&proc->nc, proc->url);
// Simple synchronous subscriber
if (s == NATS_OK)
natsConnection_SubscribeSync(&proc->sub, proc->nc, proc->topic);
return 1;
}
// this function needs to decide on processing function based on datatype
// given.. also set output types as needed (unless a sink)
//return 1 if ok
// return 0 if problem
proc_process_t proc_input_set(void * vinstance, mpdatatype_t * input_type,
mplabel_t * port,
mp_outlist_t* olist, int type_index,
void * type_table) {
return NULL;
}
//// proc processing function assigned to a specific data type in proc_io_init
//return 1 if output is available
// return 0 if not output
//
static int data_source(void * vinstance, mpdata_t* source_data,
mp_doutput_t * dout, int type_index) {
proc_instance_t * proc = (proc_instance_t*)vinstance;
proc->loops++;
natsStatus s;
s = natsSubscription_NextMsg(&proc->msg, proc->sub, 1000);
if (s == NATS_OK) {
// natsMsg *msg = proc->msg;
int msglen = natsMsg_GetDataLength(proc->msg);
tool_print("msglen: %d", msglen);
proc->meta_process_cnt += msglen;
// allocate binary dtype for Waterslide use
mpdata_t *outdata = dtype_alloc_binary(msglen);
if (!outdata) {
return 0;
}
mpdt_binary_t *bin = (mpdt_binary_t*)outdata->data;
memcpy(bin->buf, natsMsg_GetData(proc->msg), msglen);
mp_set_outdata(outdata, proc->outtype_bin, dout);
natsMsg_Destroy(proc->msg);
}
// proc->meta_process_cnt += tcp_catch_data(proc->tcpc, proc->outtype_bin,
// source_data, dout);
return 1;
}
//return 1 if successful
//return 0 if no..
int proc_destroy(void * vinstance) {
proc_instance_t * proc = (proc_instance_t*)vinstance;
tool_print("output cnt %" PRIu64, proc->meta_process_cnt);
tool_print("polling loops cnt %" PRIu64, proc->loops);
// TODO: Cleanup NATS connection/subscription
natsSubscription_Destroy(proc->sub);
natsConnection_Destroy(proc->nc);
// tcp_catch_destroy(proc->tcpc);
//free dynamic allocations
free(proc->url);
free(proc);
return 1;
}
//catch buffers of tcp messages from a tcp thrower application..
// in terms of a client-server model, this is a client that connects to a
// server
#define PROC_NAME "nats_out"
//#define DEBUG 1
#include <stdlib.h>
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
// #include <stropts.h>
// #include <poll.h>
#include "waterslide.h"
#include "waterslidedata.h"
#include "procloader.h"
#include "sysutil.h"
#include "datatypes/wsdt_tuple.h"
#include "datatypes/wsdt_fixedstring.h"
#include "datatypes/wsdt_uint.h"
#include "datatypes/wsdt_uint64.h"
#include "datatypes/wsdt_double.h"
#include <nats/nats.h>
char proc_name[] = PROC_NAME;
char proc_version[] = "0.1";
char *proc_tags[] = {"source", "output", NULL};
char *proc_alias[] = { NULL };
char proc_purpose[] = "writes messages to a Nats.io topic";
char proc_description[] = "";
proc_option_t proc_opts[] = {
/* 'option character', "long option string", "option argument",
"option description", <allow multiple>, <required>*/
{'u',"","url",
"connection URL",0,0},
{'t',"","topic",
"Topic name",0,0},
{'v',"","",
"verbose output",0,0},
// {'p',"","port",
// "Use listening port",0,0},
// {'w',"","",
// "dont wait till data is available",0,0},
// {'b',"","",
// "Use blocking sockets (default: non-blocking)", 0,0},
//the following must be left as-is to signify the end of the array
{' ',"","",
"",0,0}
};
char proc_nonswitch_opts[] = "";
char *proc_input_types[] = {"any", NULL};
// (Potential) Output types: bin
char *proc_output_types[] = {NULL};
char proc_requires[] = "nats_in should be used in a separately running process";
// Ports:
proc_port_t proc_input_ports[] = {{NULL, NULL}};
char *proc_tuple_container_labels[] = {NULL};
char *proc_tuple_conditional_container_labels[] = {NULL};
char *proc_tuple_member_labels[] = {NULL};
// char *proc_synopsis[] = {"tcpcatch -h <hostname> -p <port> [-s] [-w] [-b]", NULL};
char *proc_synopsis[] = {"nats_out -u <url> -t <topic>", NULL};
proc_example_t proc_examples[] = {
{"... | nats_out -u 'nats://localhost:4222' -t foo.bar ", "publish events"},
{NULL, NULL}
};
//function prototypes for local functions
static int proc_binary(void *, wsdata_t*, ws_doutput_t*, int);
typedef struct _proc_instance_t {
natsConnection *nc;
natsSubscription *sub;
natsMsg *msg;
char *url;
char *topic;
int verbose;
int cntReportInterval;
uint64_t meta_process_cnt;
uint64_t nextReportCnt;
uint64_t loops;
ws_outtype_t * outtype_bin;
} proc_instance_t;
static int proc_cmd_options(int argc, char ** argv,
proc_instance_t * proc, void * type_table) {
int op;
// int server = 0;
while ((op = getopt(argc, argv, "u:t:v")) != EOF) {
switch (op) {
case 'u':
proc->url = strdup(optarg);
break;
case 't':
proc->topic = strdup(optarg);
tool_print("Topic: %s", proc->topic);
break;
case 'v':
proc->verbose++;
break;
default:
return 0;
}
}
if (NULL == proc->url) {
return 0;
}
return 1;
}
// the following is a function to take in command arguments and initalize
// this processor's instance..
// also register as a source here..
// return 1 if ok
// return 0 if fail
int proc_init(wskid_t * kid, int argc, char ** argv, void ** vinstance, ws_sourcev_t * sv,
void * type_table) {
//allocate proc instance of this processor
proc_instance_t * proc =
(proc_instance_t*)calloc(1,sizeof(proc_instance_t));
*vinstance = proc;
proc->loops = 0;
proc->meta_process_cnt = 0;
//read in command options
if (!proc_cmd_options(argc, argv, proc, type_table)) {
return 0;
}
// proc->outtype_bin =
// ws_register_source_byname(type_table, "BINARY_TYPE", data_source, sv);
natsStatus s;
// s = natsOptions_SetErrorHandler(opts, asyncCb, NULL);
// if (s == NATS_OK)
// Connects to the NATS Server
s = natsConnection_ConnectTo(&proc->nc, proc->url);
// Simple synchronous subscriber
if (s == NATS_OK) {
return 1;
} else {
tool_print("Error: %d", s);
return 0;
}
// natsConnection_SubscribeSync(&proc->sub, proc->nc, proc->topic);
// return 1;
}
// this function needs to decide on processing function based on datatype
// given.. also set output types as needed (unless a sink)
//return 1 if ok
// return 0 if problem
proc_process_t proc_input_set(void * vinstance, wsdatatype_t * input_type,
wslabel_t * port,
ws_outlist_t* olist, int type_index,
void * type_table) {
//accept only binary datatypes
if (wsdatatype_match(type_table, input_type, "BINARY_TYPE")) {
return proc_binary;
}
return NULL;
}
//// proc processing function assigned to a specific data type in proc_io_init
//return 1 if output is available
// return 0 if not output
//
static int proc_binary(void * vinstance, wsdata_t* input_data,
ws_doutput_t * dout, int type_index) {
proc_instance_t * proc = (proc_instance_t*)vinstance;
proc->meta_process_cnt++;
if (proc->verbose &&
proc->meta_process_cnt >= proc->nextReportCnt) {
// struct timeval ctime;
// gettimeofday(&ctime, 0);
// if (proc->meta_process_cnt > 1) {
// double sec = ctime.tv_sec - proc->last_time.tv_sec +
// ((double)(ctime.tv_usec - proc->last_time.tv_usec)) /
// 1000000.0;
// double rate = (proc->meta_process_cnt - proc->last_cnt) / sec;
// tool_print("DUMP: %"PRIu64" records, %"PRIu64" sent, %"PRIu64
// " dropped in %0.2f sec (%lf records/sec)",
// proc->meta_process_cnt - proc->last_cnt,
// proc->tcpt->sent - proc->last_sent,
// proc->tcpt->dropped - proc->last_dropped,
// sec, rate);
// }
// proc->last_cnt = proc->meta_process_cnt;
// proc->last_sent = proc->tcpt->sent;
// proc->last_dropped = proc->tcpt->dropped;
// proc->last_time = ctime;
// proc->nextReportCnt = proc->meta_process_cnt +
// proc->cntReportInterval;
}
wsdt_binary_t *bin = (wsdt_binary_t*) input_data->data;
natsConnection_Publish(proc->nc, proc->topic, bin->buf, bin->len);
return 1;
}
// int nat
// //// proc processing function assigned to a specific data type in proc_io_init
// //return 1 if output is available
// // return 0 if not output
// //
// static int data_source(void * vinstance, wsdata_t* source_data,
// ws_doutput_t * dout, int type_index) {
// proc_instance_t * proc = (proc_instance_t*)vinstance;
// proc->loops++;
// natsStatus s;
// s = natsSubscription_NextMsg(&proc->msg, proc->sub, 1000);
// if (s == NATS_OK) {
// // natsMsg *msg = proc->msg;
// int msglen = natsMsg_GetDataLength(proc->msg);
// tool_print("msglen: %d", msglen);
// proc->meta_process_cnt += msglen;
// // allocate binary dtype for Waterslide use
// wsdata_t *outdata = dtype_alloc_binary(msglen);
// if (!outdata) {
// return 0;
// }
// wsdt_binary_t *bin = (wsdt_binary_t*)outdata->data;
// memcpy(bin->buf, natsMsg_GetData(proc->msg), msglen);
// ws_set_outdata(outdata, proc->outtype_bin, dout);
// natsMsg_Destroy(proc->msg);
// }
// // proc->meta_process_cnt += tcp_catch_data(proc->tcpc, proc->outtype_bin,
// // source_data, dout);
// return 1;
// }
//return 1 if successful
//return 0 if no..
int proc_destroy(void * vinstance) {
proc_instance_t * proc = (proc_instance_t*)vinstance;
tool_print("output cnt %" PRIu64, proc->meta_process_cnt);
tool_print("polling loops cnt %" PRIu64, proc->loops);
// TODO: Cleanup NATS connection/subscription
// natsSubscription_Destroy(proc->sub);
natsConnection_Destroy(proc->nc);
// tcp_catch_destroy(proc->tcpc);
//free dynamic allocations
free(proc->url);
free(proc);
return 1;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment