Skip to content

Instantly share code, notes, and snippets.

@hintjens
Created September 1, 2015 18:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hintjens/d54d25c30e0339fb0a87 to your computer and use it in GitHub Desktop.
Save hintjens/d54d25c30e0339fb0a87 to your computer and use it in GitHub Desktop.
/* =========================================================================
zyre_perf - bulk test tool
This tool starts a number R of responders, then sends M messages to each
responder, and waits for responses.
This test runs in a single process and does the following:
A master thread sends N "shout" messages to R responders
Each responder sends a unicast "whisper" response back to the master
The master counts up the received messages
This file is part of Zyre, an open-source framework for proximity-based
peer-to-peer applications -- See http://zyre.org.
This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=========================================================================
*/
#include "../include/zyre.h"
static void
node_actor (zsock_t *pipe, void *args)
{
zyre_t *node = zyre_new (NULL);
assert (node);
zyre_set_endpoint (node, "inproc://%s", (char *) args);
free (args);
// Connect to test hub
zyre_gossip_connect (node, "inproc://zyre-hub");
zyre_start (node);
zyre_join (node, "GLOBAL");
// Actor is ready for work
zsock_signal (pipe, 0);
int counter = 0;
char *to_peer = NULL; // Either of these set,
char *to_group = NULL; // and we set a message
zpoller_t *poller = zpoller_new (pipe, zyre_socket (node), NULL);
while (true) {
void *which = zpoller_wait (poller, -1);
if (!which)
break; // Interrupted
// $TERM from parent means exit; anything else is breach of
// contract so we should assert
if (which == pipe) {
char *command = zstr_recv (pipe);
assert (streq (command, "$TERM"));
zstr_free (&command);
break; // Finished
}
// Process events from node
if (which == zyre_socket (node)) {
zyre_event_t *event = zyre_event_new (node);
if (!event)
break; // Interrupted
// Reply to a SHOUT with a WHISPER
if (zyre_event_type (event) == ZYRE_EVENT_SHOUT)
zyre_whispers (node, zyre_event_sender (event), "World");
zyre_event_destroy (&event);
}
}
zpoller_destroy (&poller);
zyre_destroy (&node);
}
int main (int argc, char *argv [])
{
// Syntax: zyre_perf R M D
// R = number of responders (10)
// M = number of messages to each responder (1000)
// D = delay in msec between sends (0)
int nbr_peers = argc > 1? atoi (argv [1]): 10;
int nbr_sends = argc > 2? atoi (argv [2]): 1000;
int send_delay = argc > 3? atoi (argv [3]): 0;
// Create master node, and use as gossip hub
zyre_t *master = zyre_new (NULL);
assert (master);
zyre_set_endpoint (master, "inproc://master");
zyre_gossip_bind (master, "inproc://zyre-hub");
int rc = zyre_start (master);
assert (rc == 0);
zyre_join (master, "GLOBAL");
// We address nodes as an array of actors
zactor_t **actors = (zactor_t **) zmalloc (sizeof (zactor_t *) * nbr_peers);
// Start peers, each as a separate thread
int index;
zsys_info ("Starting %d peers...", nbr_peers);
for (index = 0; index < nbr_peers; index++) {
char node_name [10];
sprintf (node_name, "node-%d", index);
actors [index] = zactor_new (node_actor, strdup (node_name));
}
// Collect JOIN events to know when all peers are ready
int nbr_ready = 0;
while (!zsys_interrupted && nbr_ready < nbr_peers) {
zyre_event_t *event = zyre_event_new (master);
if (!event)
break; // Interrupted
if (zyre_event_type (event) == ZYRE_EVENT_JOIN)
nbr_ready++;
zyre_event_destroy (&event);
}
// Broadcast the messages to peers
zsys_info ("Sending %d broadcasts...", nbr_sends);
for (index = 0; index < nbr_sends; index++) {
zyre_shouts (master, "GLOBAL", "Hello");
zclock_sleep (send_delay);
}
int balance = nbr_sends * nbr_peers;
zsys_info ("Expecting %d replies...", balance);
int64_t expiry = zclock_mono () + 10000;
while (!zsys_interrupted && zclock_mono () < expiry && balance > 0) {
zyre_event_t *event = zyre_event_new (master);
if (!event)
break; // Interrupted
if (zyre_event_type (event) == ZYRE_EVENT_WHISPER)
balance--;
zyre_event_destroy (&event);
}
zsys_info ("Received %d replies (%d lost)...", (nbr_sends * nbr_peers) - balance, balance);
// Stop all peer actors
for (index = 0; index < nbr_peers; index++) {
if (actors [index])
zactor_destroy (&actors [index]);
}
free (actors);
zyre_destroy (&master);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment