Created
April 15, 2020 17:42
-
-
Save kevinAlbs/1eb3fd42a2b17d71f99e4d9389661069 to your computer and use it in GitHub Desktop.
A modification of example-sdam-monitoring.c to show that a slow server can block scanning of other servers.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* gcc example-sdam-monitoring.c -o example-sdam-monitoring \ | |
* $(pkg-config --cflags --libs libmongoc-1.0) */ | |
/* ./example-sdam-monitoring [CONNECTION_STRING] */ | |
#include <mongoc/mongoc.h> | |
#define MONGOC_INSIDE | |
#include <mongoc/mongoc-client-private.h> | |
#include <mongoc/mongoc-client-pool-private.h> | |
#include <mongoc/mongoc-cluster-private.h> | |
#include <mongoc/mongoc-topology-private.h> | |
#include <mongoc/mongoc-util-private.h> | |
#include <unistd.h> | |
#include <stdio.h> | |
typedef struct { | |
int server_changed_events; | |
int server_opening_events; | |
int server_closed_events; | |
int topology_changed_events; | |
int topology_opening_events; | |
int topology_closed_events; | |
int heartbeat_started_events; | |
int heartbeat_succeeded_events; | |
int heartbeat_failed_events; | |
} stats_t; | |
static void | |
server_changed (const mongoc_apm_server_changed_t *event) | |
{ | |
stats_t *context; | |
const mongoc_server_description_t *prev_sd, *new_sd; | |
context = (stats_t *) mongoc_apm_server_changed_get_context (event); | |
context->server_changed_events++; | |
prev_sd = mongoc_apm_server_changed_get_previous_description (event); | |
new_sd = mongoc_apm_server_changed_get_new_description (event); | |
MONGOC_DEBUG ("server changed: %s %s -> %s", | |
mongoc_apm_server_changed_get_host (event)->host_and_port, | |
mongoc_server_description_type (prev_sd), | |
mongoc_server_description_type (new_sd)); | |
} | |
static void | |
server_opening (const mongoc_apm_server_opening_t *event) | |
{ | |
stats_t *context; | |
context = (stats_t *) mongoc_apm_server_opening_get_context (event); | |
context->server_opening_events++; | |
MONGOC_DEBUG ("server opening: %s", | |
mongoc_apm_server_opening_get_host (event)->host_and_port); | |
} | |
static void | |
server_closed (const mongoc_apm_server_closed_t *event) | |
{ | |
stats_t *context; | |
context = (stats_t *) mongoc_apm_server_closed_get_context (event); | |
context->server_closed_events++; | |
MONGOC_DEBUG ("server closed: %s", | |
mongoc_apm_server_closed_get_host (event)->host_and_port); | |
} | |
static void | |
topology_changed (const mongoc_apm_topology_changed_t *event) | |
{ | |
stats_t *context; | |
const mongoc_topology_description_t *prev_td; | |
const mongoc_topology_description_t *new_td; | |
mongoc_server_description_t **prev_sds; | |
size_t n_prev_sds; | |
mongoc_server_description_t **new_sds; | |
size_t n_new_sds; | |
size_t i; | |
mongoc_read_prefs_t *prefs; | |
context = (stats_t *) mongoc_apm_topology_changed_get_context (event); | |
context->topology_changed_events++; | |
prev_td = mongoc_apm_topology_changed_get_previous_description (event); | |
prev_sds = mongoc_topology_description_get_servers (prev_td, &n_prev_sds); | |
new_td = mongoc_apm_topology_changed_get_new_description (event); | |
new_sds = mongoc_topology_description_get_servers (new_td, &n_new_sds); | |
MONGOC_DEBUG ("topology changed: %s -> %s", | |
mongoc_topology_description_type (prev_td), | |
mongoc_topology_description_type (new_td)); | |
if (n_prev_sds) { | |
MONGOC_DEBUG (" previous servers:"); | |
for (i = 0; i < n_prev_sds; i++) { | |
MONGOC_DEBUG (" %s %s", | |
mongoc_server_description_type (prev_sds[i]), | |
mongoc_server_description_host (prev_sds[i])->host_and_port); | |
} | |
} | |
if (n_new_sds) { | |
MONGOC_DEBUG (" new servers:"); | |
for (i = 0; i < n_new_sds; i++) { | |
MONGOC_DEBUG (" %s %s", | |
mongoc_server_description_type (new_sds[i]), | |
mongoc_server_description_host (new_sds[i])->host_and_port); | |
} | |
} | |
prefs = mongoc_read_prefs_new (MONGOC_READ_SECONDARY); | |
/* it is safe, and unfortunately necessary, to cast away const here */ | |
if (mongoc_topology_description_has_readable_server ( | |
(mongoc_topology_description_t *) new_td, prefs)) { | |
MONGOC_DEBUG (" secondary AVAILABLE"); | |
} else { | |
MONGOC_DEBUG (" secondary UNAVAILABLE"); | |
} | |
if (mongoc_topology_description_has_writable_server ( | |
(mongoc_topology_description_t *) new_td)) { | |
MONGOC_DEBUG (" primary AVAILABLE"); | |
} else { | |
MONGOC_DEBUG (" primary UNAVAILABLE"); | |
} | |
mongoc_read_prefs_destroy (prefs); | |
mongoc_server_descriptions_destroy_all (prev_sds, n_prev_sds); | |
mongoc_server_descriptions_destroy_all (new_sds, n_new_sds); | |
} | |
static void | |
topology_opening (const mongoc_apm_topology_opening_t *event) | |
{ | |
stats_t *context; | |
context = (stats_t *) mongoc_apm_topology_opening_get_context (event); | |
context->topology_opening_events++; | |
MONGOC_DEBUG ("topology opening"); | |
} | |
static void | |
topology_closed (const mongoc_apm_topology_closed_t *event) | |
{ | |
stats_t *context; | |
context = (stats_t *) mongoc_apm_topology_closed_get_context (event); | |
context->topology_closed_events++; | |
MONGOC_DEBUG ("topology closed"); | |
} | |
static void | |
server_heartbeat_started (const mongoc_apm_server_heartbeat_started_t *event) | |
{ | |
stats_t *context; | |
context = | |
(stats_t *) mongoc_apm_server_heartbeat_started_get_context (event); | |
context->heartbeat_started_events++; | |
MONGOC_DEBUG ("%s heartbeat started", | |
mongoc_apm_server_heartbeat_started_get_host (event)->host_and_port); | |
} | |
static void | |
server_heartbeat_succeeded ( | |
const mongoc_apm_server_heartbeat_succeeded_t *event) | |
{ | |
stats_t *context; | |
char *reply; | |
context = | |
(stats_t *) mongoc_apm_server_heartbeat_succeeded_get_context (event); | |
context->heartbeat_succeeded_events++; | |
reply = bson_as_canonical_extended_json ( | |
mongoc_apm_server_heartbeat_succeeded_get_reply (event), NULL); | |
MONGOC_DEBUG ( | |
"%s heartbeat succeeded: %s", | |
mongoc_apm_server_heartbeat_succeeded_get_host (event)->host_and_port, | |
reply); | |
bson_free (reply); | |
} | |
static void | |
server_heartbeat_failed (const mongoc_apm_server_heartbeat_failed_t *event) | |
{ | |
stats_t *context; | |
bson_error_t error; | |
context = (stats_t *) mongoc_apm_server_heartbeat_failed_get_context (event); | |
context->heartbeat_failed_events++; | |
mongoc_apm_server_heartbeat_failed_get_error (event, &error); | |
MONGOC_DEBUG ("%s heartbeat failed: %s", | |
mongoc_apm_server_heartbeat_failed_get_host (event)->host_and_port, | |
error.message); | |
} | |
typedef struct _debug_stream_stats_t { | |
mongoc_client_t client_hack; | |
int n_destroyed; | |
int n_failed; | |
} debug_stream_stats_t; | |
#define MONGOC_STREAM_DEBUG 7 | |
typedef struct _mongoc_stream_debug_t { | |
mongoc_stream_t vtable; | |
mongoc_stream_t *wrapped; | |
debug_stream_stats_t *stats; | |
bool slow; | |
} mongoc_stream_debug_t; | |
static int | |
_mongoc_stream_debug_close (mongoc_stream_t *stream) | |
{ | |
return mongoc_stream_close (((mongoc_stream_debug_t *) stream)->wrapped); | |
} | |
static void | |
_mongoc_stream_debug_destroy (mongoc_stream_t *stream) | |
{ | |
mongoc_stream_debug_t *debug_stream = (mongoc_stream_debug_t *) stream; | |
debug_stream->stats->n_destroyed++; | |
mongoc_stream_destroy (debug_stream->wrapped); | |
bson_free (debug_stream); | |
} | |
static void | |
_mongoc_stream_debug_failed (mongoc_stream_t *stream) | |
{ | |
mongoc_stream_debug_t *debug_stream = (mongoc_stream_debug_t *) stream; | |
debug_stream->stats->n_failed++; | |
mongoc_stream_failed (debug_stream->wrapped); | |
bson_free (debug_stream); | |
} | |
static int | |
_mongoc_stream_debug_setsockopt (mongoc_stream_t *stream, | |
int level, | |
int optname, | |
void *optval, | |
mongoc_socklen_t optlen) | |
{ | |
return mongoc_stream_setsockopt (((mongoc_stream_debug_t *) stream)->wrapped, | |
level, | |
optname, | |
optval, | |
optlen); | |
} | |
static int | |
_mongoc_stream_debug_flush (mongoc_stream_t *stream) | |
{ | |
return mongoc_stream_flush (((mongoc_stream_debug_t *) stream)->wrapped); | |
} | |
static ssize_t | |
_mongoc_stream_debug_readv (mongoc_stream_t *stream, | |
mongoc_iovec_t *iov, | |
size_t iovcnt, | |
size_t min_bytes, | |
int32_t timeout_msec) | |
{ | |
mongoc_stream_debug_t *debug_stream = (mongoc_stream_debug_t *) stream; | |
if (debug_stream->slow) { | |
_mongoc_usleep (10); | |
return 0; | |
} | |
return mongoc_stream_readv (debug_stream->wrapped, | |
iov, | |
iovcnt, | |
min_bytes, | |
timeout_msec); | |
} | |
static ssize_t | |
_mongoc_stream_debug_writev (mongoc_stream_t *stream, | |
mongoc_iovec_t *iov, | |
size_t iovcnt, | |
int32_t timeout_msec) | |
{ | |
return mongoc_stream_writev ( | |
((mongoc_stream_debug_t *) stream)->wrapped, iov, iovcnt, timeout_msec); | |
} | |
static bool | |
_mongoc_stream_debug_check_closed (mongoc_stream_t *stream) | |
{ | |
return mongoc_stream_check_closed ( | |
((mongoc_stream_debug_t *) stream)->wrapped); | |
} | |
static bool | |
_mongoc_stream_debug_timed_out (mongoc_stream_t *stream) | |
{ | |
return mongoc_stream_timed_out (((mongoc_stream_debug_t *) stream)->wrapped); | |
} | |
static bool | |
_mongoc_stream_debug_should_retry (mongoc_stream_t *stream) | |
{ | |
// return mongoc_stream_should_retry ( | |
// ((mongoc_stream_debug_t *) stream)->wrapped); | |
return true; | |
} | |
static mongoc_stream_t * | |
_mongoc_stream_debug_get_base_stream (mongoc_stream_t *stream) | |
{ | |
mongoc_stream_t *wrapped = ((mongoc_stream_debug_t *) stream)->wrapped; | |
/* "wrapped" is typically a mongoc_stream_buffered_t, get the real | |
* base stream */ | |
if (wrapped->get_base_stream) { | |
return wrapped->get_base_stream (wrapped); | |
} | |
return wrapped; | |
} | |
mongoc_stream_t * | |
debug_stream_new (mongoc_stream_t *stream, debug_stream_stats_t *stats) | |
{ | |
mongoc_stream_debug_t *debug_stream; | |
static int slowdown = 0; | |
if (!stream) { | |
return NULL; | |
} | |
MONGOC_DEBUG ("debug stream new"); | |
debug_stream = (mongoc_stream_debug_t *) bson_malloc0 (sizeof *debug_stream); | |
debug_stream->vtable.type = MONGOC_STREAM_DEBUG; | |
debug_stream->vtable.close = _mongoc_stream_debug_close; | |
debug_stream->vtable.destroy = _mongoc_stream_debug_destroy; | |
debug_stream->vtable.failed = _mongoc_stream_debug_failed; | |
debug_stream->vtable.flush = _mongoc_stream_debug_flush; | |
debug_stream->vtable.readv = _mongoc_stream_debug_readv; | |
debug_stream->vtable.writev = _mongoc_stream_debug_writev; | |
debug_stream->vtable.setsockopt = _mongoc_stream_debug_setsockopt; | |
debug_stream->vtable.check_closed = _mongoc_stream_debug_check_closed; | |
debug_stream->vtable.timed_out = _mongoc_stream_debug_timed_out; | |
debug_stream->vtable.should_retry = _mongoc_stream_debug_should_retry; | |
debug_stream->vtable.get_base_stream = _mongoc_stream_debug_get_base_stream; | |
debug_stream->wrapped = stream; | |
debug_stream->stats = stats; | |
if (slowdown % 2 == 1) { | |
MONGOC_DEBUG ("making stream slow"); | |
debug_stream->slow = true; | |
} | |
slowdown++; | |
return (mongoc_stream_t *) debug_stream; | |
} | |
mongoc_stream_t * | |
debug_stream_initiator (const mongoc_uri_t *uri, | |
const mongoc_host_list_t *host, | |
void *user_data, | |
bson_error_t *error) | |
{ | |
debug_stream_stats_t *stats; | |
mongoc_stream_t *default_stream; | |
stats = (debug_stream_stats_t *) user_data; | |
default_stream = | |
mongoc_client_default_stream_initiator (uri, host, &stats->client_hack, error); | |
return debug_stream_new (default_stream, stats); | |
} | |
int | |
main (int argc, char *argv[]) | |
{ | |
mongoc_client_pool_t *pool; | |
mongoc_client_t *client; | |
mongoc_apm_callbacks_t *cbs; | |
stats_t stats = {0}; | |
const char *uri_string = | |
"mongodb://127.0.0.1/?appname=sdam-monitoring-example"; | |
mongoc_uri_t *uri; | |
bson_t cmd = BSON_INITIALIZER; | |
bson_t reply; | |
bson_error_t error; | |
int64_t start; | |
debug_stream_stats_t dstats = {0}; | |
mongoc_init (); | |
if (argc > 1) { | |
uri_string = argv[1]; | |
} | |
uri = mongoc_uri_new_with_error (uri_string, &error); | |
if (!uri) { | |
fprintf (stderr, | |
"failed to parse URI: %s\n" | |
"error message: %s\n", | |
uri_string, | |
error.message); | |
return EXIT_FAILURE; | |
} | |
pool = mongoc_client_pool_new (uri); | |
_mongoc_client_pool_set_stream_initiator (pool, debug_stream_initiator, &dstats); | |
cbs = mongoc_apm_callbacks_new (); | |
mongoc_apm_set_server_changed_cb (cbs, server_changed); | |
mongoc_apm_set_server_opening_cb (cbs, server_opening); | |
mongoc_apm_set_server_closed_cb (cbs, server_closed); | |
mongoc_apm_set_topology_changed_cb (cbs, topology_changed); | |
mongoc_apm_set_topology_opening_cb (cbs, topology_opening); | |
mongoc_apm_set_topology_closed_cb (cbs, topology_closed); | |
mongoc_apm_set_server_heartbeat_started_cb (cbs, server_heartbeat_started); | |
mongoc_apm_set_server_heartbeat_succeeded_cb (cbs, | |
server_heartbeat_succeeded); | |
mongoc_apm_set_server_heartbeat_failed_cb (cbs, server_heartbeat_failed); | |
mongoc_client_pool_set_apm_callbacks ( | |
pool, cbs, (void *) &stats /* context pointer */); | |
client = mongoc_client_pool_pop (pool); | |
if (!client) { | |
return EXIT_FAILURE; | |
} | |
/* the driver connects on demand to perform first operation */ | |
BSON_APPEND_INT32 (&cmd, "ping", 1); | |
start = bson_get_monotonic_time (); | |
while (bson_get_monotonic_time() - start < 30 * 1000 * 1000) { | |
MONGOC_DEBUG ("sending ping"); | |
if (!mongoc_client_command_simple (client, "admin", &cmd, NULL, &reply, &error)) { | |
MONGOC_DEBUG ("error = %s", error.message); | |
} | |
bson_destroy (&reply); | |
sleep (1); | |
} | |
mongoc_uri_destroy (uri); | |
mongoc_client_destroy (client); | |
printf ("Events:\n" | |
" server changed: %d\n" | |
" server opening: %d\n" | |
" server closed: %d\n" | |
" topology changed: %d\n" | |
" topology opening: %d\n" | |
" topology closed: %d\n" | |
" heartbeat started: %d\n" | |
" heartbeat succeeded: %d\n" | |
" heartbeat failed: %d\n", | |
stats.server_changed_events, | |
stats.server_opening_events, | |
stats.server_closed_events, | |
stats.topology_changed_events, | |
stats.topology_opening_events, | |
stats.topology_closed_events, | |
stats.heartbeat_started_events, | |
stats.heartbeat_succeeded_events, | |
stats.heartbeat_failed_events); | |
bson_destroy (&cmd); | |
mongoc_apm_callbacks_destroy (cbs); | |
mongoc_cleanup (); | |
return EXIT_SUCCESS; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment