Skip to content

Instantly share code, notes, and snippets.

@vyskocilm
Last active May 2, 2016 12:05
Show Gist options
  • Save vyskocilm/19b3590884cce6fea9e47b067864acac to your computer and use it in GitHub Desktop.
Save vyskocilm/19b3590884cce6fea9e47b067864acac to your computer and use it in GitHub Desktop.
# Malamute configuration
# Apply to the whole broker
server
timeout = 10000 # Client connection timeout, msec
background = 0 # Run as background process
workdir = . # Working directory for daemon
verbose = 1 # Do verbose logging of activity?
# auth
# verbose = 1 # Debug authentication steps?
# plain = passwords.cfg
# Apply to the Malamute service
mlm_server
echo = binding Malamute service to 'ipc://@/malamute'
# security
# mechanism = plain
bind
endpoint = ipc://@/malamute
#include <malamute.h>
// sleep broker test case
static const char *endpoint = "ipc://@/malamute";
static const char *stream = "STREAM";
static const char *producer = "producer1";
static const char *consumer = "consumer1";
static void
s_producer (zsock_t *pipe, void *args)
{
mlm_client_t *client = mlm_client_new ();
//mlm_client_set_verbose (client, true);
mlm_client_connect (client, endpoint, 1000, producer);
mlm_client_set_producer (client, stream);
zsock_signal (pipe, 0);
zsys_info ("%s connected", producer);
while (!zsys_interrupted)
{
mlm_client_sendx (client, "subject", "ehlo", NULL);
zclock_sleep (500);
}
mlm_client_destroy (&client);
}
static void
s_consumer (zsock_t *pipe, void *args)
{
mlm_client_t *client = mlm_client_new ();
//mlm_client_set_verbose (client, true);
mlm_client_connect (client, endpoint, 1000, consumer);
mlm_client_set_consumer (client, stream, ".*");
zpoller_t *poller = zpoller_new (pipe, mlm_client_msgpipe (client), NULL);
zsock_signal (pipe, 0);
zsys_info ("%s connected", consumer);
while (!zsys_interrupted)
{
void *which = zpoller_wait (poller, 0);
if (!which)
continue;
if (which == pipe)
break;
char *subject, *msg;
mlm_client_recvx (client, &subject, &msg, NULL);
zclock_sleep (500);
zsys_info ("subject=%s, msg=%s", subject, msg);
zstr_free (&subject);
zstr_free (&msg);
}
zpoller_destroy (&poller);
mlm_client_destroy (&client);
}
int main ()
{
zactor_t *producer = zactor_new (s_producer, NULL);
zactor_t *consumer = zactor_new (s_consumer, NULL);
while (!zsys_interrupted)
{
zclock_sleep (500);
}
zactor_destroy (&consumer);
zactor_destroy (&producer);
}
#!/bin/sh
malamute malamute.cfg &
MALAMUTE_PID=$!
gcc -lmlm -lczmq sbroker.c -o sbroker || exit 1
./sbroker &
SBROKER_PID=$!
echo "let it exchange some messages"
sleep 2
echo "send SIGSTOP to ${MALAMUTE_PID} and ${SBROKER_PID}"
kill -s STOP ${MALAMUTE_PID} || exit 1
kill -s STOP ${SBROKER_PID} || exit 1
sleep 12
echo "send SIGCONT to ${MALAMUTE_PID} and ${SBROKER_PID}"
kill -s CONT ${MALAMUTE_PID}
kill -s CONT ${SBROKER_PID}
wait
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment