Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@nkottary
Created February 23, 2016 08:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nkottary/cdf745759b892338fb58 to your computer and use it in GitHub Desktop.
Save nkottary/cdf745759b892338fb58 to your computer and use it in GitHub Desktop.
#include <stdio.h>
#include <stdlib.h>
#include <malloc.h>
#include <string.h>
#include <stdint.h>
#include <zmq.h>
/* Look for leaks in while repeatedly sending messages
Compile like this: gcc -I/usr/local/include msgtest.c -L/usr/local/lib -lzmq -g -o mt
*/
const char BINDING[] = "ipc:///tmp/abcd";
const char MESSAGE[] = "abcdefghijklmnopqrstuvwxyz";
void report_error()
{
int eno = zmq_errno();
printf("Error returned by ZMQ: %s\n", zmq_strerror(eno));
}
void freedata (void *data, void* hint)
{
free(data);
}
int get_vmem_usage ()
{
int pid = getpid(), vmem = 0;
char command[100], path[100];
sprintf(command, "ps -p %d -o vsz | sed -n 2p", pid);
FILE *fp = popen(command, "r");
if (fp == NULL) {
printf("ERROR: Failed to run shell command.\n");
exit(1);
}
while (fgets(path, sizeof(path)-1, fp) != NULL) {
sscanf(path, "%d", &vmem);
}
pclose(fp);
return vmem;
}
void send_message (void *socket)
{
int rc = 0;
zmq_msg_t zmsg;
void *data = malloc(sizeof(MESSAGE));
memcpy(data, MESSAGE, sizeof(MESSAGE));
rc = zmq_msg_init_data(&zmsg, data, sizeof(MESSAGE), freedata, NULL);
if (rc != 0) {
printf("Failed: zmq_msg_init_data\n");
report_error();
exit(1);
}
rc = zmq_msg_send(&zmsg, socket, 0);
if (rc != 0) {
printf("Failed: zmq_msg_send\n");
report_error();
exit(1);
}
rc = zmq_msg_close(&zmsg);
if (rc != 0) {
printf("Failed: zmq_msg_close\n");
report_error();
exit(1);
}
}
int main ()
{
void *ctx = zmq_ctx_new();
int rc = 0, i = 0;
if (ctx == NULL) {
printf("Failed: zmq_ctx_new\n");
report_error();
exit(1);
}
void *socket = zmq_socket(ctx, ZMQ_PUB);
if (socket == NULL) {
printf("Failed: zmq_socket\n");
report_error();
exit(1);
}
int hwm = 1000;
rc = zmq_setsockopt(socket, ZMQ_SNDHWM, &hwm, sizeof(hwm));
if (rc != 0) {
printf("Failed: zmq_setsockopt\n");
report_error();
exit(1);
}
rc = zmq_bind(socket, BINDING);
if (rc != 0) {
printf("Failed: zmq_bind\n");
report_error();
exit(1);
}
while (i < 1000000) {
send_message(socket);
if (i % 100000 == 0) {
printf("VMEM usage is: %d\n", get_vmem_usage());
}
}
rc = zmq_unbind(socket, BINDING);
if (rc != 0) {
printf("Failed: zmq_unbind\n");
report_error();
exit(1);
}

rc = zmq_close(socket);
if (rc != 0) {
printf("Failed: zmq_close\n");
report_error();
exit(1);
}
rc = zmq_ctx_destroy(ctx);
if (rc != 0) {
printf("Failed: zmq_ctx_destroy\n");
report_error();
exit(1);
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment