|
//catch buffers of tcp messages from a tcp thrower application.. |
|
// in terms of a client-server model, this is a client that connects to a |
|
// server |
|
#define PROC_NAME "nats_out" |
|
//#define DEBUG 1 |
|
#include <stdlib.h> |
|
#include <stdint.h> |
|
#include <stdio.h> |
|
#include <string.h> |
|
#include <unistd.h> |
|
// #include <stropts.h> |
|
// #include <poll.h> |
|
#include "waterslide.h" |
|
#include "waterslidedata.h" |
|
#include "procloader.h" |
|
#include "sysutil.h" |
|
#include "datatypes/wsdt_tuple.h" |
|
#include "datatypes/wsdt_fixedstring.h" |
|
#include "datatypes/wsdt_uint.h" |
|
#include "datatypes/wsdt_uint64.h" |
|
#include "datatypes/wsdt_double.h" |
|
#include <nats/nats.h> |
|
|
|
|
|
char proc_name[] = PROC_NAME; |
|
char proc_version[] = "0.1"; |
|
char *proc_tags[] = {"source", "output", NULL}; |
|
char *proc_alias[] = { NULL }; |
|
char proc_purpose[] = "writes messages to a Nats.io topic"; |
|
char proc_description[] = ""; |
|
|
|
proc_option_t proc_opts[] = { |
|
/* 'option character', "long option string", "option argument", |
|
"option description", <allow multiple>, <required>*/ |
|
{'u',"","url", |
|
"connection URL",0,0}, |
|
{'t',"","topic", |
|
"Topic name",0,0}, |
|
{'v',"","", |
|
"verbose output",0,0}, |
|
// {'p',"","port", |
|
// "Use listening port",0,0}, |
|
// {'w',"","", |
|
// "dont wait till data is available",0,0}, |
|
// {'b',"","", |
|
// "Use blocking sockets (default: non-blocking)", 0,0}, |
|
//the following must be left as-is to signify the end of the array |
|
{' ',"","", |
|
"",0,0} |
|
}; |
|
char proc_nonswitch_opts[] = ""; |
|
char *proc_input_types[] = {"any", NULL}; |
|
// (Potential) Output types: bin |
|
char *proc_output_types[] = {NULL}; |
|
char proc_requires[] = "nats_in should be used in a separately running process"; |
|
// Ports: |
|
proc_port_t proc_input_ports[] = {{NULL, NULL}}; |
|
char *proc_tuple_container_labels[] = {NULL}; |
|
char *proc_tuple_conditional_container_labels[] = {NULL}; |
|
char *proc_tuple_member_labels[] = {NULL}; |
|
// char *proc_synopsis[] = {"tcpcatch -h <hostname> -p <port> [-s] [-w] [-b]", NULL}; |
|
char *proc_synopsis[] = {"nats_out -u <url> -t <topic>", NULL}; |
|
proc_example_t proc_examples[] = { |
|
{"... | nats_out -u 'nats://localhost:4222' -t foo.bar ", "publish events"}, |
|
{NULL, NULL} |
|
}; |
|
|
|
//function prototypes for local functions |
|
static int proc_binary(void *, wsdata_t*, ws_doutput_t*, int); |
|
|
|
|
|
typedef struct _proc_instance_t { |
|
natsConnection *nc; |
|
natsSubscription *sub; |
|
natsMsg *msg; |
|
char *url; |
|
char *topic; |
|
int verbose; |
|
int cntReportInterval; |
|
uint64_t meta_process_cnt; |
|
uint64_t nextReportCnt; |
|
uint64_t loops; |
|
ws_outtype_t * outtype_bin; |
|
} proc_instance_t; |
|
|
|
static int proc_cmd_options(int argc, char ** argv, |
|
proc_instance_t * proc, void * type_table) { |
|
|
|
int op; |
|
// int server = 0; |
|
|
|
while ((op = getopt(argc, argv, "u:t:v")) != EOF) { |
|
switch (op) { |
|
case 'u': |
|
proc->url = strdup(optarg); |
|
break; |
|
case 't': |
|
proc->topic = strdup(optarg); |
|
tool_print("Topic: %s", proc->topic); |
|
break; |
|
case 'v': |
|
proc->verbose++; |
|
break; |
|
default: |
|
return 0; |
|
} |
|
} |
|
|
|
if (NULL == proc->url) { |
|
return 0; |
|
} |
|
|
|
return 1; |
|
} |
|
|
|
// the following is a function to take in command arguments and initalize |
|
// this processor's instance.. |
|
// also register as a source here.. |
|
// return 1 if ok |
|
// return 0 if fail |
|
int proc_init(wskid_t * kid, int argc, char ** argv, void ** vinstance, ws_sourcev_t * sv, |
|
void * type_table) { |
|
|
|
//allocate proc instance of this processor |
|
proc_instance_t * proc = |
|
(proc_instance_t*)calloc(1,sizeof(proc_instance_t)); |
|
*vinstance = proc; |
|
|
|
proc->loops = 0; |
|
proc->meta_process_cnt = 0; |
|
|
|
//read in command options |
|
if (!proc_cmd_options(argc, argv, proc, type_table)) { |
|
return 0; |
|
} |
|
|
|
// proc->outtype_bin = |
|
// ws_register_source_byname(type_table, "BINARY_TYPE", data_source, sv); |
|
|
|
natsStatus s; |
|
|
|
// s = natsOptions_SetErrorHandler(opts, asyncCb, NULL); |
|
|
|
// if (s == NATS_OK) |
|
// Connects to the NATS Server |
|
s = natsConnection_ConnectTo(&proc->nc, proc->url); |
|
|
|
// Simple synchronous subscriber |
|
if (s == NATS_OK) { |
|
return 1; |
|
} else { |
|
tool_print("Error: %d", s); |
|
return 0; |
|
} |
|
|
|
// natsConnection_SubscribeSync(&proc->sub, proc->nc, proc->topic); |
|
|
|
// return 1; |
|
} |
|
|
|
// this function needs to decide on processing function based on datatype |
|
// given.. also set output types as needed (unless a sink) |
|
//return 1 if ok |
|
// return 0 if problem |
|
proc_process_t proc_input_set(void * vinstance, wsdatatype_t * input_type, |
|
wslabel_t * port, |
|
ws_outlist_t* olist, int type_index, |
|
void * type_table) { |
|
//accept only binary datatypes |
|
if (wsdatatype_match(type_table, input_type, "BINARY_TYPE")) { |
|
return proc_binary; |
|
} |
|
|
|
return NULL; |
|
} |
|
|
|
//// proc processing function assigned to a specific data type in proc_io_init |
|
//return 1 if output is available |
|
// return 0 if not output |
|
// |
|
static int proc_binary(void * vinstance, wsdata_t* input_data, |
|
ws_doutput_t * dout, int type_index) { |
|
proc_instance_t * proc = (proc_instance_t*)vinstance; |
|
proc->meta_process_cnt++; |
|
|
|
if (proc->verbose && |
|
proc->meta_process_cnt >= proc->nextReportCnt) { |
|
// struct timeval ctime; |
|
// gettimeofday(&ctime, 0); |
|
|
|
// if (proc->meta_process_cnt > 1) { |
|
// double sec = ctime.tv_sec - proc->last_time.tv_sec + |
|
// ((double)(ctime.tv_usec - proc->last_time.tv_usec)) / |
|
// 1000000.0; |
|
// double rate = (proc->meta_process_cnt - proc->last_cnt) / sec; |
|
// tool_print("DUMP: %"PRIu64" records, %"PRIu64" sent, %"PRIu64 |
|
// " dropped in %0.2f sec (%lf records/sec)", |
|
// proc->meta_process_cnt - proc->last_cnt, |
|
// proc->tcpt->sent - proc->last_sent, |
|
// proc->tcpt->dropped - proc->last_dropped, |
|
// sec, rate); |
|
// } |
|
|
|
// proc->last_cnt = proc->meta_process_cnt; |
|
// proc->last_sent = proc->tcpt->sent; |
|
// proc->last_dropped = proc->tcpt->dropped; |
|
// proc->last_time = ctime; |
|
// proc->nextReportCnt = proc->meta_process_cnt + |
|
// proc->cntReportInterval; |
|
} |
|
|
|
wsdt_binary_t *bin = (wsdt_binary_t*) input_data->data; |
|
natsConnection_Publish(proc->nc, proc->topic, bin->buf, bin->len); |
|
return 1; |
|
|
|
} |
|
|
|
// int nat |
|
// //// proc processing function assigned to a specific data type in proc_io_init |
|
// //return 1 if output is available |
|
// // return 0 if not output |
|
// // |
|
// static int data_source(void * vinstance, wsdata_t* source_data, |
|
// ws_doutput_t * dout, int type_index) { |
|
// proc_instance_t * proc = (proc_instance_t*)vinstance; |
|
|
|
// proc->loops++; |
|
// natsStatus s; |
|
// s = natsSubscription_NextMsg(&proc->msg, proc->sub, 1000); |
|
// if (s == NATS_OK) { |
|
// // natsMsg *msg = proc->msg; |
|
// int msglen = natsMsg_GetDataLength(proc->msg); |
|
// tool_print("msglen: %d", msglen); |
|
// proc->meta_process_cnt += msglen; |
|
// // allocate binary dtype for Waterslide use |
|
// wsdata_t *outdata = dtype_alloc_binary(msglen); |
|
// if (!outdata) { |
|
// return 0; |
|
// } |
|
// wsdt_binary_t *bin = (wsdt_binary_t*)outdata->data; |
|
|
|
// memcpy(bin->buf, natsMsg_GetData(proc->msg), msglen); |
|
|
|
// ws_set_outdata(outdata, proc->outtype_bin, dout); |
|
|
|
// natsMsg_Destroy(proc->msg); |
|
// } |
|
|
|
// // proc->meta_process_cnt += tcp_catch_data(proc->tcpc, proc->outtype_bin, |
|
// // source_data, dout); |
|
// return 1; |
|
// } |
|
|
|
//return 1 if successful |
|
//return 0 if no.. |
|
int proc_destroy(void * vinstance) { |
|
proc_instance_t * proc = (proc_instance_t*)vinstance; |
|
tool_print("output cnt %" PRIu64, proc->meta_process_cnt); |
|
tool_print("polling loops cnt %" PRIu64, proc->loops); |
|
|
|
// TODO: Cleanup NATS connection/subscription |
|
// natsSubscription_Destroy(proc->sub); |
|
natsConnection_Destroy(proc->nc); |
|
// tcp_catch_destroy(proc->tcpc); |
|
|
|
//free dynamic allocations |
|
free(proc->url); |
|
free(proc); |
|
|
|
return 1; |
|
} |