Skip to content

Instantly share code, notes, and snippets.

@chuckremes
Created March 1, 2011 00:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chuckremes/848355 to your computer and use it in GitHub Desktop.
Save chuckremes/848355 to your computer and use it in GitHub Desktop.
//
// Leak example using PUB/SUB pattern
//
#include "zhelpers.h"
#include "zmsg.c"
// ---------------------------------------------------------------------
// This is our client task
// It creates a socket, sends one message and closes the socket. It does
// this 9000 times and then goes to sleep.
// When LINGER is set to any number greater than 0, the memory locked up
// by zmq_connect() is *never* released. When LINGER is 0, the memory
// is *slowly* released over the course of a few minutes.
//
// This example will create and destroy approximately 36k sockets before
// it sleeps.
//
static void *
client_task (void *args) {
void *context = zmq_init (1);
struct timeval tstart, tend, tdiff;
int sequence = 0;
while (1) {
void *client = zmq_socket (context, ZMQ_PUB);
int linger = 1;
zmq_setsockopt(client, ZMQ_LINGER, &linger, sizeof(linger));
zmq_connect (client, "tcp://127.0.0.1:5555");
char string [20];
sprintf (string, "%d", sequence);
sequence += 1;
s_send (client, string);
// Clean up and end task properly
zmq_close (client);
if ((sequence % 9000 == 0)) s_sleep(200000);
}
zmq_term (context);
return (NULL);
}
// ---------------------------------------------------------------------
// Subscriber; just throws away all messages it receives.
static void *server_worker (void *socket);
void *server_task (void *args) {
void *context = zmq_init (1);
void *server = zmq_socket (context, ZMQ_SUB);
zmq_setsockopt (server, ZMQ_SUBSCRIBE, "", 0);
zmq_bind (server, "tcp://*:5555");
while (1) {
char *string = s_recv (server);
free (string);
}
zmq_close (server);
zmq_term (context);
return (NULL);
}
// This main thread simply starts several clients, and a server, and then
// waits for the server to finish.
//
int main () {
s_version_assert (2, 1);
pthread_t client_thread;
int count;
for (count = 0; count < 4; count++)
pthread_create (&client_thread, NULL, client_task, NULL);
pthread_t server_thread;
pthread_create (&server_thread, NULL, server_task, NULL);
pthread_join (server_thread, NULL);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment