Skip to content

Instantly share code, notes, and snippets.

@hintjens
Created February 21, 2011 12:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save hintjens/837011 to your computer and use it in GitHub Desktop.
Save hintjens/837011 to your computer and use it in GitHub Desktop.
Provoke assertion during zmq_close
Requires quite a few attempts, and then >BOOM<:
ph@ws200901:~/work/zguide/examples/C$ for a in *.c; do mtrelay; done
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Segmentation fault (core dumped)
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Segmentation fault (core dumped)
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Test successful!
Program terminated with signal 11, Segmentation fault.
#0 zmq::object_t::send_reap (this=0x7fb000, socket_=0x7fb000) at object.cpp:357
357 cmd.destination = ctx->get_reaper ();
(gdb) bt
#0 zmq::object_t::send_reap (this=0x7fb000, socket_=0x7fb000) at object.cpp:357
#1 0x00007ff35ad1f42c in zmq::socket_base_t::close (this=0x7fb000) at socket_base.cpp:576
#2 0x00007ff35ad2881e in zmq_close (s_=0x7fb000) at zmq.cpp:303
#3 0x0000000000401006 in step1 (args=0x7ff35896ae20) at mtrelay.c:24
#4 0x00007ff35aaeb9ca in start_thread (arg=<value optimized out>) at pthread_create.c:300
#5 0x00007ff35a18470d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:112
#6 0x0000000000000000 in ?? ()
(gdb)
//
// Multithreaded relay
//
// Changes for 2.1:
// - added version assertion
// - pass context & thread to child threads
// - create socket pair for inproc communications
// - close sockets in each child thread
//
#include "zhelpers.h"
// This is the structure we pass to child threads
typedef struct {
void *context;
void *socket;
} thread_args_t;
static void *
step1 (void *args) {
thread_args_t *input = (thread_args_t *) args;
// Signal downstream to step 2
s_send (input->socket, "");
zmq_close (input->socket);
return NULL;
}
static void *
step2 (void *args) {
thread_args_t *input = (thread_args_t *) args;
// Create socket pair, then pass one socket to upstream thread
thread_args_t child;
void *receiver = zmq_socket (input->context, ZMQ_PAIR);
zmq_bind (receiver, "inproc://step2");
child.context = input->context;
child.socket = zmq_socket (input->context, ZMQ_PAIR);
zmq_connect (child.socket, "inproc://step2");
pthread_t thread;
pthread_create (&thread, NULL, step1, &child);
// Wait for signal
char *string = s_recv (receiver);
free (string);
zmq_close (receiver);
// Signal downstream to step 3
s_send (input->socket, "");
zmq_close (input->socket);
return NULL;
}
int main () {
s_version_assert (2, 1);
void *context = zmq_init (1);
// Create socket pair, then pass one socket to upstream thread
thread_args_t child;
void *receiver = zmq_socket (context, ZMQ_PAIR);
zmq_bind (receiver, "inproc://step3");
child.context = context;
child.socket = zmq_socket (context, ZMQ_PAIR);
zmq_connect (child.socket, "inproc://step3");
pthread_t thread;
pthread_create (&thread, NULL, step2, &child);
// Wait for signal
char *string = s_recv (receiver);
free (string);
printf ("Test successful!\n");
zmq_close (receiver);
zmq_term (context);
return 0;
}
/* =========================================================================
zhelpers.h
Helper header file for example applications.
-------------------------------------------------------------------------
Copyright (c) 1991-2010 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
This is free software; you can redistribute it and/or modify it under the
terms of the GNU Lesser General Public License as published by the Free
Software Foundation; either version 3 of the License, or (at your option)
any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABIL-
ITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
=========================================================================
*/
#ifndef __ZHELPERS_H_INCLUDED__
#define __ZHELPERS_H_INCLUDED__
// Include a bunch of headers that we will need in the examples
#include <zmq.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <string.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
#include <assert.h>
// Bring Windows MSVC up to C99 scratch
#if (defined (__WINDOWS__))
typedef unsigned long ulong;
typedef unsigned int uint;
typedef __int64 int64_t;
#endif
// Provide random number from 0..(num-1)
#define randof(num) (int) ((float) (num) * random () / (RAND_MAX + 1.0))
// Receive 0MQ string from socket and convert into C string
// Caller must free returned string.
static char *
s_recv (void *socket) {
zmq_msg_t message;
zmq_msg_init (&message);
if (zmq_recv (socket, &message, 0))
exit (1); // Context terminated, exit
int size = zmq_msg_size (&message);
char *string = malloc (size + 1);
memcpy (string, zmq_msg_data (&message), size);
zmq_msg_close (&message);
string [size] = 0;
return (string);
}
// Convert C string to 0MQ string and send to socket
static int
s_send (void *socket, char *string) {
int rc;
zmq_msg_t message;
zmq_msg_init_size (&message, strlen (string));
memcpy (zmq_msg_data (&message), string, strlen (string));
rc = zmq_send (socket, &message, 0);
assert (!rc);
zmq_msg_close (&message);
return (rc);
}
// Sends string as 0MQ string, as multipart non-terminal
static int
s_sendmore (void *socket, char *string) {
int rc;
zmq_msg_t message;
zmq_msg_init_size (&message, strlen (string));
memcpy (zmq_msg_data (&message), string, strlen (string));
rc = zmq_send (socket, &message, ZMQ_SNDMORE);
zmq_msg_close (&message);
assert (!rc);
return (rc);
}
// Receives all message parts from socket, prints neatly
//
static void
s_dump (void *socket)
{
puts ("----------------------------------------");
while (1) {
// Process all parts of the message
zmq_msg_t message;
zmq_msg_init (&message);
zmq_recv (socket, &message, 0);
// Dump the message as text or binary
char *data = zmq_msg_data (&message);
int size = zmq_msg_size (&message);
int is_text = 1;
int char_nbr;
for (char_nbr = 0; char_nbr < size; char_nbr++)
if ((unsigned char) data [char_nbr] < 32
|| (unsigned char) data [char_nbr] > 127)
is_text = 0;
printf ("[%03d] ", size);
for (char_nbr = 0; char_nbr < size; char_nbr++) {
if (is_text)
printf ("%c", data [char_nbr]);
else
printf ("%02X", (unsigned char) data [char_nbr]);
}
printf ("\n");
int64_t more; // Multipart detection
size_t more_size = sizeof (more);
zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
zmq_msg_close (&message);
if (!more)
break; // Last message part
}
}
// Set simple random printable identity on socket
//
static void
s_set_id (void *socket)
{
char identity [10];
sprintf (identity, "%04X-%04X", randof (0x10000), randof (0x10000));
zmq_setsockopt (socket, ZMQ_IDENTITY, identity, strlen (identity));
}
// Report 0MQ version number
//
static void
s_version (void)
{
int major, minor, patch;
zmq_version (&major, &minor, &patch);
printf ("Current 0MQ version is %d.%d.%d\n", major, minor, patch);
}
// Require at least some specified version
static void
s_version_assert (int want_major, int want_minor)
{
int major, minor, patch;
zmq_version (&major, &minor, &patch);
if (major < want_major
|| (major == want_major && minor < want_minor)) {
printf ("Current 0MQ version is %d.%d\n", major, minor);
printf ("Application needs at least %d.%d - cannot continue\n", major, minor);
exit (EXIT_FAILURE);
}
}
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment