Skip to content

Instantly share code, notes, and snippets.

@vyskocilm
Last active August 29, 2015 14:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vyskocilm/d463b039c4b9c307fe75 to your computer and use it in GitHub Desktop.
Save vyskocilm/d463b039c4b9c307fe75 to your computer and use it in GitHub Desktop.
test case for malamute issue 81
# Malamute configuration
# Apply to the whole broker
server
timeout = 10000 # Client connection timeout, msec
background = 0 # Run as background process
workdir = . # Working directory for daemon
verbose = 0 # Do verbose logging of activity?
auth
verbose = 1 # Debug authentication steps?
plain = passwords.cfg
# Apply to the Malamute service
mlm_server
echo = binding Malamute service to 'ipc://@/malamute'
security
mechanism = null
bind
endpoint = ipc://@/malamute
/*
* Test case for https://github.com/zeromq/malamute/issues/81
*
* Michal Vyskocil <michalvyskocil@eaton.com>, licensed under Public Domain
*
*/
#include <malamute.h>
#define ENDPOINT "ipc://@/malamute"
#define STREAM "stream"
#define TIMEOUT 500
static void
s_producer(zsock_t *pipe, void *args) {
int* p_id = (int*) args;
char* name;
asprintf(&name, "producer-%d", *p_id);
mlm_client_t *client = mlm_client_new();
assert (client);
int r = mlm_client_connect(client, ENDPOINT, 5000, name);
assert (r == 0);
mlm_client_set_producer(client, STREAM);
int cnt = 0;
zsock_signal(pipe, 0);
while(!zsys_interrupted) {
mlm_client_sendx(client, "SUBJECT", "hello", NULL);
zsys_debug("[%s] sent %d messages", name, ++cnt);
zclock_sleep(TIMEOUT);
}
mlm_client_destroy(&client);
zsock_destroy(&pipe);
}
static void
s_consumer(zsock_t *pipe, void *args) {
int* p_id = (int*) args;
char* name;
asprintf(&name, "consumer-%d", *p_id);
mlm_client_t *client = mlm_client_new();
assert (client);
int r = mlm_client_connect(client, ENDPOINT, 5000, name);
assert (r == 0);
mlm_client_set_consumer(client, STREAM, ".*");
int cnt = 0;
zsock_signal(pipe, 0);
while(!zsys_interrupted) {
char *subject, *msg;
mlm_client_recvx(client, &subject, &msg, NULL);
zsys_debug("got subject=%s, msg=%s", subject, msg);
free(subject);
free(msg);
}
mlm_client_destroy(&client);
zsock_destroy(&pipe);
}
int main() {
// Start a server to test against, and bind to endpoint
zactor_t *server = zactor_new (mlm_server, ENDPOINT);
zstr_sendx (server, "LOAD", "malamute.cfg", NULL);
int id = 1;
zactor_t *producer1 = zactor_new(s_producer, &id);
assert(producer1);
zactor_t *consumer1 = zactor_new(s_consumer, &id);
assert(consumer1);
zpoller_t *poller = zpoller_new(producer1, consumer1, NULL);
assert(poller);
while(!zpoller_terminated(poller)) {
zsock_t *which = zpoller_wait(poller, -1);
break;
}
zactor_destroy(&server);
zactor_destroy(&producer1);
zactor_destroy(&consumer1);
zpoller_destroy(&poller);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment