Skip to content

Instantly share code, notes, and snippets.

@kevinAlbs
Created April 15, 2020 17:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kevinAlbs/1eb3fd42a2b17d71f99e4d9389661069 to your computer and use it in GitHub Desktop.
Save kevinAlbs/1eb3fd42a2b17d71f99e4d9389661069 to your computer and use it in GitHub Desktop.
A modification of example-sdam-monitoring.c to show that a slow server can block scanning of other servers.
/* gcc example-sdam-monitoring.c -o example-sdam-monitoring \
* $(pkg-config --cflags --libs libmongoc-1.0) */
/* ./example-sdam-monitoring [CONNECTION_STRING] */
#include <mongoc/mongoc.h>
#define MONGOC_INSIDE
#include <mongoc/mongoc-client-private.h>
#include <mongoc/mongoc-client-pool-private.h>
#include <mongoc/mongoc-cluster-private.h>
#include <mongoc/mongoc-topology-private.h>
#include <mongoc/mongoc-util-private.h>
#include <unistd.h>
#include <stdio.h>
typedef struct {
int server_changed_events;
int server_opening_events;
int server_closed_events;
int topology_changed_events;
int topology_opening_events;
int topology_closed_events;
int heartbeat_started_events;
int heartbeat_succeeded_events;
int heartbeat_failed_events;
} stats_t;
static void
server_changed (const mongoc_apm_server_changed_t *event)
{
stats_t *context;
const mongoc_server_description_t *prev_sd, *new_sd;
context = (stats_t *) mongoc_apm_server_changed_get_context (event);
context->server_changed_events++;
prev_sd = mongoc_apm_server_changed_get_previous_description (event);
new_sd = mongoc_apm_server_changed_get_new_description (event);
MONGOC_DEBUG ("server changed: %s %s -> %s",
mongoc_apm_server_changed_get_host (event)->host_and_port,
mongoc_server_description_type (prev_sd),
mongoc_server_description_type (new_sd));
}
static void
server_opening (const mongoc_apm_server_opening_t *event)
{
stats_t *context;
context = (stats_t *) mongoc_apm_server_opening_get_context (event);
context->server_opening_events++;
MONGOC_DEBUG ("server opening: %s",
mongoc_apm_server_opening_get_host (event)->host_and_port);
}
static void
server_closed (const mongoc_apm_server_closed_t *event)
{
stats_t *context;
context = (stats_t *) mongoc_apm_server_closed_get_context (event);
context->server_closed_events++;
MONGOC_DEBUG ("server closed: %s",
mongoc_apm_server_closed_get_host (event)->host_and_port);
}
static void
topology_changed (const mongoc_apm_topology_changed_t *event)
{
stats_t *context;
const mongoc_topology_description_t *prev_td;
const mongoc_topology_description_t *new_td;
mongoc_server_description_t **prev_sds;
size_t n_prev_sds;
mongoc_server_description_t **new_sds;
size_t n_new_sds;
size_t i;
mongoc_read_prefs_t *prefs;
context = (stats_t *) mongoc_apm_topology_changed_get_context (event);
context->topology_changed_events++;
prev_td = mongoc_apm_topology_changed_get_previous_description (event);
prev_sds = mongoc_topology_description_get_servers (prev_td, &n_prev_sds);
new_td = mongoc_apm_topology_changed_get_new_description (event);
new_sds = mongoc_topology_description_get_servers (new_td, &n_new_sds);
MONGOC_DEBUG ("topology changed: %s -> %s",
mongoc_topology_description_type (prev_td),
mongoc_topology_description_type (new_td));
if (n_prev_sds) {
MONGOC_DEBUG (" previous servers:");
for (i = 0; i < n_prev_sds; i++) {
MONGOC_DEBUG (" %s %s",
mongoc_server_description_type (prev_sds[i]),
mongoc_server_description_host (prev_sds[i])->host_and_port);
}
}
if (n_new_sds) {
MONGOC_DEBUG (" new servers:");
for (i = 0; i < n_new_sds; i++) {
MONGOC_DEBUG (" %s %s",
mongoc_server_description_type (new_sds[i]),
mongoc_server_description_host (new_sds[i])->host_and_port);
}
}
prefs = mongoc_read_prefs_new (MONGOC_READ_SECONDARY);
/* it is safe, and unfortunately necessary, to cast away const here */
if (mongoc_topology_description_has_readable_server (
(mongoc_topology_description_t *) new_td, prefs)) {
MONGOC_DEBUG (" secondary AVAILABLE");
} else {
MONGOC_DEBUG (" secondary UNAVAILABLE");
}
if (mongoc_topology_description_has_writable_server (
(mongoc_topology_description_t *) new_td)) {
MONGOC_DEBUG (" primary AVAILABLE");
} else {
MONGOC_DEBUG (" primary UNAVAILABLE");
}
mongoc_read_prefs_destroy (prefs);
mongoc_server_descriptions_destroy_all (prev_sds, n_prev_sds);
mongoc_server_descriptions_destroy_all (new_sds, n_new_sds);
}
static void
topology_opening (const mongoc_apm_topology_opening_t *event)
{
stats_t *context;
context = (stats_t *) mongoc_apm_topology_opening_get_context (event);
context->topology_opening_events++;
MONGOC_DEBUG ("topology opening");
}
static void
topology_closed (const mongoc_apm_topology_closed_t *event)
{
stats_t *context;
context = (stats_t *) mongoc_apm_topology_closed_get_context (event);
context->topology_closed_events++;
MONGOC_DEBUG ("topology closed");
}
static void
server_heartbeat_started (const mongoc_apm_server_heartbeat_started_t *event)
{
stats_t *context;
context =
(stats_t *) mongoc_apm_server_heartbeat_started_get_context (event);
context->heartbeat_started_events++;
MONGOC_DEBUG ("%s heartbeat started",
mongoc_apm_server_heartbeat_started_get_host (event)->host_and_port);
}
static void
server_heartbeat_succeeded (
const mongoc_apm_server_heartbeat_succeeded_t *event)
{
stats_t *context;
char *reply;
context =
(stats_t *) mongoc_apm_server_heartbeat_succeeded_get_context (event);
context->heartbeat_succeeded_events++;
reply = bson_as_canonical_extended_json (
mongoc_apm_server_heartbeat_succeeded_get_reply (event), NULL);
MONGOC_DEBUG (
"%s heartbeat succeeded: %s",
mongoc_apm_server_heartbeat_succeeded_get_host (event)->host_and_port,
reply);
bson_free (reply);
}
static void
server_heartbeat_failed (const mongoc_apm_server_heartbeat_failed_t *event)
{
stats_t *context;
bson_error_t error;
context = (stats_t *) mongoc_apm_server_heartbeat_failed_get_context (event);
context->heartbeat_failed_events++;
mongoc_apm_server_heartbeat_failed_get_error (event, &error);
MONGOC_DEBUG ("%s heartbeat failed: %s",
mongoc_apm_server_heartbeat_failed_get_host (event)->host_and_port,
error.message);
}
typedef struct _debug_stream_stats_t {
mongoc_client_t client_hack;
int n_destroyed;
int n_failed;
} debug_stream_stats_t;
#define MONGOC_STREAM_DEBUG 7
typedef struct _mongoc_stream_debug_t {
mongoc_stream_t vtable;
mongoc_stream_t *wrapped;
debug_stream_stats_t *stats;
bool slow;
} mongoc_stream_debug_t;
static int
_mongoc_stream_debug_close (mongoc_stream_t *stream)
{
return mongoc_stream_close (((mongoc_stream_debug_t *) stream)->wrapped);
}
static void
_mongoc_stream_debug_destroy (mongoc_stream_t *stream)
{
mongoc_stream_debug_t *debug_stream = (mongoc_stream_debug_t *) stream;
debug_stream->stats->n_destroyed++;
mongoc_stream_destroy (debug_stream->wrapped);
bson_free (debug_stream);
}
static void
_mongoc_stream_debug_failed (mongoc_stream_t *stream)
{
mongoc_stream_debug_t *debug_stream = (mongoc_stream_debug_t *) stream;
debug_stream->stats->n_failed++;
mongoc_stream_failed (debug_stream->wrapped);
bson_free (debug_stream);
}
static int
_mongoc_stream_debug_setsockopt (mongoc_stream_t *stream,
int level,
int optname,
void *optval,
mongoc_socklen_t optlen)
{
return mongoc_stream_setsockopt (((mongoc_stream_debug_t *) stream)->wrapped,
level,
optname,
optval,
optlen);
}
static int
_mongoc_stream_debug_flush (mongoc_stream_t *stream)
{
return mongoc_stream_flush (((mongoc_stream_debug_t *) stream)->wrapped);
}
static ssize_t
_mongoc_stream_debug_readv (mongoc_stream_t *stream,
mongoc_iovec_t *iov,
size_t iovcnt,
size_t min_bytes,
int32_t timeout_msec)
{
mongoc_stream_debug_t *debug_stream = (mongoc_stream_debug_t *) stream;
if (debug_stream->slow) {
_mongoc_usleep (10);
return 0;
}
return mongoc_stream_readv (debug_stream->wrapped,
iov,
iovcnt,
min_bytes,
timeout_msec);
}
static ssize_t
_mongoc_stream_debug_writev (mongoc_stream_t *stream,
mongoc_iovec_t *iov,
size_t iovcnt,
int32_t timeout_msec)
{
return mongoc_stream_writev (
((mongoc_stream_debug_t *) stream)->wrapped, iov, iovcnt, timeout_msec);
}
static bool
_mongoc_stream_debug_check_closed (mongoc_stream_t *stream)
{
return mongoc_stream_check_closed (
((mongoc_stream_debug_t *) stream)->wrapped);
}
static bool
_mongoc_stream_debug_timed_out (mongoc_stream_t *stream)
{
return mongoc_stream_timed_out (((mongoc_stream_debug_t *) stream)->wrapped);
}
static bool
_mongoc_stream_debug_should_retry (mongoc_stream_t *stream)
{
// return mongoc_stream_should_retry (
// ((mongoc_stream_debug_t *) stream)->wrapped);
return true;
}
static mongoc_stream_t *
_mongoc_stream_debug_get_base_stream (mongoc_stream_t *stream)
{
mongoc_stream_t *wrapped = ((mongoc_stream_debug_t *) stream)->wrapped;
/* "wrapped" is typically a mongoc_stream_buffered_t, get the real
* base stream */
if (wrapped->get_base_stream) {
return wrapped->get_base_stream (wrapped);
}
return wrapped;
}
mongoc_stream_t *
debug_stream_new (mongoc_stream_t *stream, debug_stream_stats_t *stats)
{
mongoc_stream_debug_t *debug_stream;
static int slowdown = 0;
if (!stream) {
return NULL;
}
MONGOC_DEBUG ("debug stream new");
debug_stream = (mongoc_stream_debug_t *) bson_malloc0 (sizeof *debug_stream);
debug_stream->vtable.type = MONGOC_STREAM_DEBUG;
debug_stream->vtable.close = _mongoc_stream_debug_close;
debug_stream->vtable.destroy = _mongoc_stream_debug_destroy;
debug_stream->vtable.failed = _mongoc_stream_debug_failed;
debug_stream->vtable.flush = _mongoc_stream_debug_flush;
debug_stream->vtable.readv = _mongoc_stream_debug_readv;
debug_stream->vtable.writev = _mongoc_stream_debug_writev;
debug_stream->vtable.setsockopt = _mongoc_stream_debug_setsockopt;
debug_stream->vtable.check_closed = _mongoc_stream_debug_check_closed;
debug_stream->vtable.timed_out = _mongoc_stream_debug_timed_out;
debug_stream->vtable.should_retry = _mongoc_stream_debug_should_retry;
debug_stream->vtable.get_base_stream = _mongoc_stream_debug_get_base_stream;
debug_stream->wrapped = stream;
debug_stream->stats = stats;
if (slowdown % 2 == 1) {
MONGOC_DEBUG ("making stream slow");
debug_stream->slow = true;
}
slowdown++;
return (mongoc_stream_t *) debug_stream;
}
mongoc_stream_t *
debug_stream_initiator (const mongoc_uri_t *uri,
const mongoc_host_list_t *host,
void *user_data,
bson_error_t *error)
{
debug_stream_stats_t *stats;
mongoc_stream_t *default_stream;
stats = (debug_stream_stats_t *) user_data;
default_stream =
mongoc_client_default_stream_initiator (uri, host, &stats->client_hack, error);
return debug_stream_new (default_stream, stats);
}
int
main (int argc, char *argv[])
{
mongoc_client_pool_t *pool;
mongoc_client_t *client;
mongoc_apm_callbacks_t *cbs;
stats_t stats = {0};
const char *uri_string =
"mongodb://127.0.0.1/?appname=sdam-monitoring-example";
mongoc_uri_t *uri;
bson_t cmd = BSON_INITIALIZER;
bson_t reply;
bson_error_t error;
int64_t start;
debug_stream_stats_t dstats = {0};
mongoc_init ();
if (argc > 1) {
uri_string = argv[1];
}
uri = mongoc_uri_new_with_error (uri_string, &error);
if (!uri) {
fprintf (stderr,
"failed to parse URI: %s\n"
"error message: %s\n",
uri_string,
error.message);
return EXIT_FAILURE;
}
pool = mongoc_client_pool_new (uri);
_mongoc_client_pool_set_stream_initiator (pool, debug_stream_initiator, &dstats);
cbs = mongoc_apm_callbacks_new ();
mongoc_apm_set_server_changed_cb (cbs, server_changed);
mongoc_apm_set_server_opening_cb (cbs, server_opening);
mongoc_apm_set_server_closed_cb (cbs, server_closed);
mongoc_apm_set_topology_changed_cb (cbs, topology_changed);
mongoc_apm_set_topology_opening_cb (cbs, topology_opening);
mongoc_apm_set_topology_closed_cb (cbs, topology_closed);
mongoc_apm_set_server_heartbeat_started_cb (cbs, server_heartbeat_started);
mongoc_apm_set_server_heartbeat_succeeded_cb (cbs,
server_heartbeat_succeeded);
mongoc_apm_set_server_heartbeat_failed_cb (cbs, server_heartbeat_failed);
mongoc_client_pool_set_apm_callbacks (
pool, cbs, (void *) &stats /* context pointer */);
client = mongoc_client_pool_pop (pool);
if (!client) {
return EXIT_FAILURE;
}
/* the driver connects on demand to perform first operation */
BSON_APPEND_INT32 (&cmd, "ping", 1);
start = bson_get_monotonic_time ();
while (bson_get_monotonic_time() - start < 30 * 1000 * 1000) {
MONGOC_DEBUG ("sending ping");
if (!mongoc_client_command_simple (client, "admin", &cmd, NULL, &reply, &error)) {
MONGOC_DEBUG ("error = %s", error.message);
}
bson_destroy (&reply);
sleep (1);
}
mongoc_uri_destroy (uri);
mongoc_client_destroy (client);
printf ("Events:\n"
" server changed: %d\n"
" server opening: %d\n"
" server closed: %d\n"
" topology changed: %d\n"
" topology opening: %d\n"
" topology closed: %d\n"
" heartbeat started: %d\n"
" heartbeat succeeded: %d\n"
" heartbeat failed: %d\n",
stats.server_changed_events,
stats.server_opening_events,
stats.server_closed_events,
stats.topology_changed_events,
stats.topology_opening_events,
stats.topology_closed_events,
stats.heartbeat_started_events,
stats.heartbeat_succeeded_events,
stats.heartbeat_failed_events);
bson_destroy (&cmd);
mongoc_apm_callbacks_destroy (cbs);
mongoc_cleanup ();
return EXIT_SUCCESS;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment