Skip to content

Instantly share code, notes, and snippets.

@hintjens
Last active October 29, 2015 09:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hintjens/501abc0f8eddc0428ce4 to your computer and use it in GitHub Desktop.
Save hintjens/501abc0f8eddc0428ce4 to your computer and use it in GitHub Desktop.
Test case for issue 1608
// Test case for #1608
#include <czmq.h>
#define BATCH_SIZE 5000
static void
sink_actor (zsock_t *pipe, void *args)
{
int latency [BATCH_SIZE];
int count = 0;
zsock_signal (pipe, 0);
bool terminated = false;
while (!terminated && !zsys_interrupted) {
char *command;
int64_t sent_usecs;
zsock_recv (pipe, "s8", &command, &sent_usecs);
if (streq (command, "$TERM"))
terminated = true;
else
if (streq (command, "DATA")) {
assert (count < BATCH_SIZE);
latency [count++] = (int) (zclock_usecs () - sent_usecs);
}
else
if (streq (command, "REPORT")) {
assert (count == BATCH_SIZE);
int index;
for (index = 0; index < BATCH_SIZE; index++)
zstr_sendf (pipe, "%d", latency [index]);
}
free (command);
}
}
static void
s_collect_latencies (char *label, zactor_t *actor, FILE *stream)
{
fprintf (stream, "%s", label);
zstr_send (actor, "REPORT");
int index;
for (index = 0; index < BATCH_SIZE; index++) {
char *latency = zstr_recv (actor);
fprintf (stream, ", %s", latency);
free (latency);
}
fprintf (stream, "\n");
}
int main (void)
{
// Remove HWM on pipes so we don't block on sending
zsys_set_pipehwm (0);
FILE *results = fopen ("results.csv", "w");
assert (results);
printf ("Test 1: send batch immediately...\n");
zactor_t *sink = zactor_new (sink_actor, NULL);
assert (sink);
int count;
for (count = 0; count < BATCH_SIZE; count++)
zsock_send (sink, "s8", "DATA", zclock_usecs ());
s_collect_latencies ("send", sink, results);
zactor_destroy (&sink);
printf ("Test 2: sleep 1 second, then send messages...\n");
sink = zactor_new (sink_actor, NULL);
assert (sink);
sleep (1);
for (count = 0; count < BATCH_SIZE; count++)
zsock_send (sink, "s8", "DATA", zclock_usecs ());
s_collect_latencies ("sleep-send", sink, results);
zactor_destroy (&sink);
printf ("Test 3: poll 1 second, then send messages...\n");
sink = zactor_new (sink_actor, NULL);
assert (sink);
zpoller_t *poller = zpoller_new (NULL);
zpoller_wait (poller, 1000);
for (count = 0; count < BATCH_SIZE; count++)
zsock_send (sink, "s8", "DATA", zclock_usecs ());
s_collect_latencies ("poll-send", sink, results);
zpoller_destroy (&poller);
zactor_destroy (&sink);
printf ("Test 4: send batch with sleep delay...\n");
sink = zactor_new (sink_actor, NULL);
assert (sink);
for (count = 0; count < BATCH_SIZE; count++) {
zsock_send (sink, "s8", "DATA", zclock_usecs ());
zclock_sleep (1); // One msec
}
s_collect_latencies ("sendsleep", sink, results);
zactor_destroy (&sink);
printf ("Test 5: send batch with poll delay...\n");
sink = zactor_new (sink_actor, NULL);
assert (sink);
poller = zpoller_new (NULL);
for (count = 0; count < BATCH_SIZE; count++) {
zsock_send (sink, "s8", "DATA", zclock_usecs ());
zpoller_wait (poller, 1);
}
s_collect_latencies ("sendpoll", sink, results);
zpoller_destroy (&poller);
zactor_destroy (&sink);
printf ("Test 6: sleep, then send batch with sleep delay...\n");
sink = zactor_new (sink_actor, NULL);
assert (sink);
sleep (1);
for (count = 0; count < BATCH_SIZE; count++) {
zsock_send (sink, "s8", "DATA", zclock_usecs ());
zclock_sleep (1); // One msec
}
s_collect_latencies ("sleep-sendsleep", sink, results);
zactor_destroy (&sink);
printf ("Test 7: delay, then send batch with poll delay...\n");
sink = zactor_new (sink_actor, NULL);
assert (sink);
poller = zpoller_new (NULL);
zpoller_wait (poller, BATCH_SIZE);
for (count = 0; count < BATCH_SIZE; count++) {
zsock_send (sink, "s8", "DATA", zclock_usecs ());
zpoller_wait (poller, 1);
}
s_collect_latencies ("poll-sendpoll", sink, results);
zpoller_destroy (&poller);
zactor_destroy (&sink);
fclose (results);
return 0;
}
// Test case for #1608
// This sends with varying pauses between messages
#include <czmq.h>
#define BATCH_SIZE 1000
static void
sink_actor (zsock_t *pipe, void *args)
{
int latency [BATCH_SIZE];
int count = 0;
zsock_signal (pipe, 0);
bool terminated = false;
while (!terminated && !zsys_interrupted) {
char *command;
int64_t sent_usecs;
zsock_recv (pipe, "s8", &command, &sent_usecs);
if (streq (command, "$TERM"))
terminated = true;
else
if (streq (command, "DATA")) {
assert (count < BATCH_SIZE);
latency [count++] = (int) (zclock_usecs () - sent_usecs);
}
else
if (streq (command, "REPORT")) {
assert (count == BATCH_SIZE);
int index;
for (index = 0; index < BATCH_SIZE; index++)
zstr_sendf (pipe, "%d", latency [index]);
}
free (command);
}
}
static void
s_collect_latencies (char *label, zactor_t *actor, FILE *stream)
{
fprintf (stream, "%s", label);
zstr_send (actor, "REPORT");
int index;
for (index = 0; index < BATCH_SIZE; index++) {
char *latency = zstr_recv (actor);
fprintf (stream, ", %s", latency);
free (latency);
}
fprintf (stream, "\n");
}
int main (void)
{
// Remove HWM on pipes so we don't block on sending
zsys_set_pipehwm (0);
FILE *results = fopen ("results.csv", "w");
assert (results);
zactor_t *sink;
int count;
printf ("Send batch with 1 msec delay...\n");
sink = zactor_new (sink_actor, NULL);
assert (sink);
for (count = 0; count < BATCH_SIZE; count++) {
zsock_send (sink, "s8", "DATA", zclock_usecs ());
zclock_sleep (1);
}
s_collect_latencies ("delay1", sink, results);
zactor_destroy (&sink);
printf ("Send batch with 8 msec delay...\n");
sink = zactor_new (sink_actor, NULL);
assert (sink);
for (count = 0; count < BATCH_SIZE; count++) {
zsock_send (sink, "s8", "DATA", zclock_usecs ());
zclock_sleep (8);
}
s_collect_latencies ("delay8", sink, results);
zactor_destroy (&sink);
printf ("Send batch with 32 msec delay...\n");
sink = zactor_new (sink_actor, NULL);
assert (sink);
for (count = 0; count < BATCH_SIZE; count++) {
zsock_send (sink, "s8", "DATA", zclock_usecs ());
zclock_sleep (64);
}
s_collect_latencies ("delay32", sink, results);
zactor_destroy (&sink);
printf ("Send batch with 64 msec delay...\n");
sink = zactor_new (sink_actor, NULL);
assert (sink);
for (count = 0; count < BATCH_SIZE; count++) {
zsock_send (sink, "s8", "DATA", zclock_usecs ());
zclock_sleep (64);
}
s_collect_latencies ("delay64", sink, results);
zactor_destroy (&sink);
fclose (results);
return 0;
}
// Test case for #1608
// This sends with varying message sizes
#include <czmq.h>
#define BATCH_SIZE 1000
#define SEND_DELAY 10 // msec
static void
sink_actor (zsock_t *pipe, void *args)
{
int latency [BATCH_SIZE];
int count = 0;
zsock_signal (pipe, 0);
bool terminated = false;
while (!terminated && !zsys_interrupted) {
char *command;
int64_t sent_usecs;
zsock_recv (pipe, "s8", &command, &sent_usecs);
if (streq (command, "$TERM"))
terminated = true;
else
if (streq (command, "DATA")) {
assert (count < BATCH_SIZE);
latency [count++] = (int) (zclock_usecs () - sent_usecs);
}
else
if (streq (command, "REPORT")) {
assert (count == BATCH_SIZE);
int index;
for (index = 0; index < BATCH_SIZE; index++)
zstr_sendf (pipe, "%d", latency [index]);
}
free (command);
}
}
static void
run_testcase (char *label, int size, FILE *stream)
{
// Remove HWM on pipes so we don't block on sending
zsys_set_pipehwm (0);
printf ("Test payload size=%d: ", size);
zactor_t *sink = zactor_new (sink_actor, NULL);
char *payload = malloc (size);
assert (sink);
int count;
for (count = 0; count < BATCH_SIZE; count++) {
zsock_send (sink, "s8s", "DATA", zclock_usecs (), payload);
zclock_sleep (SEND_DELAY);
if (count % 100 == 0) {
printf (".");
fflush (stdout);
}
}
printf ("\n");
fprintf (stream, "%s", label);
zstr_send (sink, "REPORT");
int index;
for (index = 0; index < BATCH_SIZE; index++) {
char *latency = zstr_recv (sink);
fprintf (stream, ", %s", latency);
free (latency);
}
fprintf (stream, "\n");
free (payload);
zactor_destroy (&sink);
}
int main (void)
{
FILE *results = fopen ("results.csv", "w");
assert (results);
run_testcase ("data0", 0, results);
run_testcase ("data10", 10, results);
run_testcase ("data100", 100, results);
run_testcase ("data1K", 1000, results);
run_testcase ("data10K", 10000, results);
fclose (results);
return 0;
}
// Test case for #1608
// This uses CLIENT and SERVER sockets
#include <czmq.h>
#define BATCH_SIZE 1000
#define SEND_DELAY 10 // msec
static void
sink_actor (zsock_t *pipe, void *args)
{
int latency [BATCH_SIZE];
zsock_t *server = zsock_new_server ((char *) args);
zsock_signal (pipe, 0);
bool terminated = false;
while (!terminated && !zsys_interrupted) {
char *command = zstr_recv (pipe);
if (streq (command, "$TERM"))
terminated = true;
else
if (streq (command, "START")) {
int count;
for (count = 0; count < BATCH_SIZE; count++) {
int64_t sent_usecs;
zsock_recv (server, "8", &sent_usecs);
latency [count++] = (int) (zclock_usecs () - sent_usecs);
}
}
else
if (streq (command, "REPORT")) {
FILE *results = fopen ("results.csv", "a");
char *label = zstr_recv (pipe);
fprintf (results, "%s", label);
free (label);
int count;
for (count = 0; count < BATCH_SIZE; count++)
fprintf (results, ",%d", latency [count]);
fprintf (results, "\n");
fclose (results);
}
free (command);
}
zsock_destroy (&server);
}
static void
run_testcase (char *label, char *endpoint)
{
zactor_t *sink = zactor_new (sink_actor, endpoint);
assert (sink);
zsock_t *client = zsock_new_client (endpoint);
assert (client);
zstr_send (sink, "START");
int count;
for (count = 0; count < BATCH_SIZE; count++) {
zsock_send (client, "8", zclock_usecs ());
zclock_sleep (SEND_DELAY);
if (count % 100 == 0) {
printf (".");
fflush (stdout);
}
}
printf ("\n");
zstr_send (sink, "REPORT");
zstr_send (sink, label);
zactor_destroy (&sink);
zsock_destroy (&client);
}
int main (void)
{
// Don't block on sending test data
zsys_set_sndhwm (0);
zsys_set_rcvhwm (0);
remove ("results.csv");
run_testcase ("TCP", "tcp://127.0.0.1:10001");
run_testcase ("IPC", "ipc://@/testcase4");
run_testcase ("inproc", "inproc://testcase4");
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment