Skip to content

Instantly share code, notes, and snippets.

@mboeh
Created November 29, 2013 00:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mboeh/7700041 to your computer and use it in GitHub Desktop.
Save mboeh/7700041 to your computer and use it in GitHub Desktop.
C version of ZeroMQ experiment.
#include <czmq.h>
#include <bstrlib.h>
const char* shouter_addr = "inproc://shouter";
const char* listener_addr = "inproc://listener";
const char* ui_addr = "inproc://ui";
// Simulation Thread
typedef struct simulation_arg_t
{
// Input parameters
bstring label;
// Internal context
zctx_t* context;
void* shoutsock;
void* listsock;
void* uisock;
bstring lastmsg;
} simulation_arg_t;
void teardown_simulation_args(simulation_arg_t **args_p)
{
simulation_arg_t *args = *args_p;
bdestroy(args->label);
bdestroy(args->lastmsg);
if (args->shoutsock) zsocket_destroy(args->context, args->shoutsock);
if (args->listsock) zsocket_destroy(args->context, args->listsock);
if (args->uisock) zsocket_destroy(args->context, args->uisock);
free(*args_p);
*args_p = NULL;
}
void simulation_th(void *args_v, zctx_t *context, void *pipe)
{
simulation_arg_t *args = args_v;
args->context = context;
// Open connections to other threads
args->shoutsock = zsocket_new(args->context, ZMQ_PAIR);
zsocket_bind(args->shoutsock, shouter_addr);
args->listsock = zsocket_new(args->context, ZMQ_PAIR);
zsocket_bind(args->listsock, listener_addr);
args->uisock = zsocket_new(args->context, ZMQ_PAIR);
zsocket_connect(args->uisock, ui_addr);
// Set up poller
zpoller_t* poller = zpoller_new(args->uisock, args->listsock, NULL);
int ticks = 0;
args->lastmsg = bfromcstr("nothing");
while (1) {
void* sock = zpoller_wait(poller, 1000/60);
if (sock == args->uisock) {
char* msg = zstr_recv(sock);
bstring outmsg = bformat("%s: %d: %s? %s!",
args->label,
ticks,
args->lastmsg->data,
msg);
zstr_send(args->shoutsock, outmsg->data);
bdestroy(outmsg);
free(msg);
} else if (sock == args->listsock) {
char* msg = zstr_recv(sock);
bassigncstr(args->lastmsg, msg);
free(msg);
}
ticks += 1;
}
zpoller_destroy(&poller);
teardown_simulation_args(&args);
zstr_send(pipe, "done");
}
void* start_simulation(zctx_t* context, const char* label)
{
simulation_arg_t *args = calloc(1, sizeof(simulation_arg_t));
if (args == NULL) return NULL;
args->label = bfromcstr(label);
if (args->label == NULL) return NULL;
return zthread_fork(context, simulation_th, args);
}
// Listener Thread
typedef struct listener_arg_t
{
// Input parameters
bstring port;
// Internal context
zctx_t* context;
void* recvsock;
void* simsock;
} listener_arg_t;
void teardown_listener_args(listener_arg_t **args_p)
{
listener_arg_t *args = *args_p;
bdestroy(args->port);
if (args->simsock) zsocket_destroy(args->context, args->simsock);
if (args->recvsock) zsocket_destroy(args->context, args->recvsock);
free(*args_p);
*args_p = NULL;
}
void listener_th(void *args_v, zctx_t *context, void *pipe)
{
listener_arg_t *args = args_v;
args->context = context;
// Open connections to other threads
bstring recvaddr = bformat("tcp://localhost:%s", args->port->data);
args->recvsock = zsocket_new(args->context, ZMQ_SUB);
zsocket_connect(args->recvsock, recvaddr->data);
zmq_setsockopt(args->recvsock, ZMQ_SUBSCRIBE, "", 0);
args->simsock = zsocket_new(args->context, ZMQ_PAIR);
zsocket_connect(args->simsock, listener_addr);
while (1) {
char* msg = zstr_recv(args->recvsock);
if(msg == NULL) break;
printf("HEARING >> %s\n", msg);
// Split it by ?...
bstring msgstr = bfromcstr(msg);
struct bstrList* strlist = bsplit(msgstr, '?');
// Save the last bit after ?...
bassign(msgstr, strlist->entry[strlist->qty - 1]);
// Chop off the final !
btrunc(msgstr, blength(msgstr) - 1);
// Send it on, finally
zstr_send(args->simsock, msgstr->data);
bstrListDestroy(strlist);
bdestroy(msgstr);
free(msg);
}
teardown_listener_args(&args);
zstr_send(pipe, "done");
}
void* start_listener(zctx_t* context, const char* port)
{
listener_arg_t *args = calloc(1, sizeof(listener_arg_t));
if (args == NULL) return NULL;
args->port = bfromcstr(port);
if (args->port == NULL) return NULL;
return zthread_fork(context, listener_th, args);
}
// Shouter Thread
typedef struct shouter_arg_t
{
// Input parameters
bstring port;
// Internal context
zctx_t* context;
void* sendsock;
void* simsock;
} shouter_arg_t;
void teardown_shouter_args(shouter_arg_t **args_p)
{
shouter_arg_t *args = *args_p;
bdestroy(args->port);
if (args->simsock) zsocket_destroy(args->context, args->simsock);
if (args->sendsock) zsocket_destroy(args->context, args->sendsock);
free(*args_p);
*args_p = NULL;
}
void shouter_th(void *args_v, zctx_t *context, void *pipe)
{
shouter_arg_t *args = args_v;
args->context = context;
// Open connections to other threads
bstring sendaddr = bformat("tcp://*:%s", args->port->data);
args->sendsock = zsocket_new(args->context, ZMQ_PUB);
zsocket_bind(args->sendsock, sendaddr->data);
args->simsock = zsocket_new(args->context, ZMQ_PAIR);
int ok = zsocket_connect(args->simsock, shouter_addr);
while (1) {
char* str = zstr_recv(args->simsock);
zstr_send(args->sendsock, str);
free(str);
}
teardown_shouter_args(&args);
zstr_send(pipe, "done");
}
void* start_shouter(zctx_t* context, const char* port)
{
shouter_arg_t *args = calloc(1, sizeof(shouter_arg_t));
if (args == NULL) return NULL;
args->port = bfromcstr(port);
if (args->port == NULL) return NULL;
return zthread_fork(context, shouter_th, args);
}
int main(int argc, char *argv [])
{
zctx_t *context = zctx_new();
bstring inport_s = bfromcstr(argv[1]);
bstring outport_s = bfromcstr(argv[2]);
void* inputsock = zsocket_new(context, ZMQ_PAIR);
zsocket_bind(inputsock, ui_addr);
void* simpipe = start_simulation(context, "a thread");
void* listpipe = start_listener(context, inport_s->data);
void* shoutpipe = start_shouter(context, outport_s->data);
while (1) {
bstring input = bgets((bNgetc) fgetc, stdin, '\n');
btrimws(input);
zstr_send(inputsock, input->data);
bdestroy(input);
}
zctx_destroy(&context);
bdestroy(inport_s);
bdestroy(outport_s);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment