Skip to content

Instantly share code, notes, and snippets.

@ShaneHarvey
Last active January 24, 2020 23:14
Show Gist options
  • Save ShaneHarvey/abb528c578d5f7f9b9080c77747251ab to your computer and use it in GitHub Desktop.
Save ShaneHarvey/abb528c578d5f7f9b9080c77747251ab to your computer and use it in GitHub Desktop.
import logging
import time
import threading
from bson import SON
from pymongo import monitoring, MongoClient
from pymongo.server_selectors import writable_server_selector
class ServerLogger(monitoring.ServerListener):
def opened(self, event):
logging.info("Server {0.server_address} added to topology "
"{0.topology_id}".format(event))
def description_changed(self, event):
previous_server_type = event.previous_description.server_type
new_server_type = event.new_description.server_type
if new_server_type != previous_server_type:
# server_type_name was added in PyMongo 3.4
logging.info(
"Server {0.server_address} changed type from "
"{0.previous_description.server_type_name} to "
"{0.new_description.server_type_name}".format(event))
def closed(self, event):
logging.warning("Server {0.server_address} removed from topology "
"{0.topology_id}".format(event))
class HeartbeatLogger(monitoring.ServerHeartbeatListener):
def started(self, event):
logging.info("Heartbeat sent to server "
"{0.connection_id}".format(event))
def succeeded(self, event):
# The reply.document attribute was added in PyMongo 3.4.
logging.info("Heartbeat to server {0.connection_id} "
"succeeded with reply "
"{0.reply.document}".format(event))
def failed(self, event):
if hasattr(event.reply, 'details'):
resp = event.reply.details
extra = ": {0}".format(resp)
else:
extra = ""
logging.warning("Heartbeat to server {0.connection_id} "
"failed with error {0.reply}{1}".format(event, extra))
class TopologyLogger(monitoring.TopologyListener):
def opened(self, event):
logging.info("Topology with id {0.topology_id} "
"opened".format(event))
def description_changed(self, event):
logging.info("Topology description updated for "
"topology id {0.topology_id}".format(event))
previous_topology_type = event.previous_description.topology_type
new_topology_type = event.new_description.topology_type
if new_topology_type != previous_topology_type:
# topology_type_name was added in PyMongo 3.4
logging.info(
"Topology {0.topology_id} changed type from "
"{0.previous_description.topology_type_name} to "
"{0.new_description.topology_type_name}".format(event))
# The has_writable_server and has_readable_server methods
# were added in PyMongo 3.4.
if not event.new_description.has_writable_server():
logging.warning("No writable servers available.")
def closed(self, event):
logging.info("Topology with id {0.topology_id} "
"closed".format(event))
class CommandLogger(monitoring.CommandListener):
def started(self, event):
logging.info("Command {0.command_name} with request id "
"{0.request_id} started on server "
"{0.connection_id}".format(event))
def succeeded(self, event):
logging.info("Command {0.command_name} with request id "
"{0.request_id} on server {0.connection_id} "
"succeeded in {0.duration_micros} "
"microseconds".format(event))
def failed(self, event):
logging.info("Command {0.command_name} with request id "
"{0.request_id} on server {0.connection_id} "
"failed in {0.duration_micros} "
"microseconds".format(event))
LOGGING_FORMAT = '%(asctime)s [%(levelname)s] %(threadName)s:%(lineno)d - %(message)s'
def get_pool(client):
"""Get the standalone, primary, or mongos pool."""
topology = client._get_topology()
server = topology.select_server(writable_server_selector)
return server.pool
def insert(client):
start = time.time()
client.test.test.insert_one({})
time_taken = time.time() - start
logging.info('INSERT took: %s', time_taken)
def _fail_point(client, command_args):
cmd_on = SON([('configureFailPoint', 'failCommand')])
cmd_on.update(command_args)
client.admin.command(cmd_on)
def fail_insert(client, network_error=False):
config = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["insert"],
"errorCode": 10107, # NotMaster
"closeConnection": network_error,
}
}
_fail_point(client, config)
def cause_failover_after(seconds):
with get_client() as client:
time.sleep(seconds)
logging.info('Causing failover via replSetStepDown\n')
client.admin.command('replSetStepDown', 10)
def get_client(**kwargs):
"""Return a client connected to the whole replica set"""
# Discover the replica set name.
with MongoClient() as client:
doc = client.admin.command('isMaster')
name = doc['setName']
hosts = doc['hosts']
return MongoClient(hosts, replicaSet=name, retryWrites=True, **kwargs)
def main():
logging.basicConfig(format=LOGGING_FORMAT, level=logging.INFO)
listeners = [ServerLogger(), HeartbeatLogger(), TopologyLogger()]
client = get_client(event_listeners=listeners)
logging.info('Primary: %s', client.admin.command('isMaster')['me'])
insert(client)
# Cause a network error
logging.info('Causing a network error with failCommand\n')
fail_insert(client, network_error=True)
insert(client)
# Cause a NotMaster error
logging.info('Causing a NotMaster error with failCommand\n')
fail_insert(client, network_error=False)
insert(client)
# Run retryable writes for 10 seconds.
thread = threading.Thread(target=cause_failover_after, args=(3,))
thread.start()
start = time.time()
while time.time() - start < 10:
insert(client)
time.sleep(0.05)
thread.join()
# for i in range(2):
# if i == 1:
# # Cause a network error.
# logging.info('Causing a network error')
# pool = get_pool(client)
# for sock in pool.sockets:
# sock.sock.close()
# insert(client)
# time.sleep(0.05)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment