Skip to content

Instantly share code, notes, and snippets.

@nelhage
Last active December 18, 2015 11:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nelhage/5777797 to your computer and use it in GitHub Desktop.
Save nelhage/5777797 to your computer and use it in GitHub Desktop.
import os
import subprocess
import tempfile
import shutil
import time
import logging
from contextlib import contextmanager
import pymongo
class ReplSet(object):
def __init__(self, mongod='mongod', port=9000, verbose=0):
self.mongod = mongod
self.port = port
self.verbose = verbose
def start_mongos(self):
self.tempdir = tempfile.mkdtemp()
self.processes = []
self.rs_name = 'rs_%d' % os.getpid()
for i in xrange(3):
port = self.port + i
d = os.path.join(self.tempdir, "mongo-%d" % (port,))
os.mkdir(d)
extra_args = []
if self.verbose > 0:
extra_args.append('-' + 'v' * self.verbose)
mongo = subprocess.Popen([
self.mongod,
'--smallfiles', '--noprealloc', '--journal', '--nopreallocj',
'--port', str(port),
'--replSet', self.rs_name,
'--dbpath', d,
'--logpath', os.path.join(d, 'mongo.log')]
+ extra_args)
self.processes.append(mongo)
def connect(self, port):
while True:
try:
return pymongo.MongoClient('localhost', port,
read_preference=pymongo.read_preferences.ReadPreference.PRIMARY_PREFERRED)
except pymongo.errors.ConnectionFailure as e:
logging.info("Unable to connect: %s" % (e,))
logging.info("Retrying...")
time.sleep(1)
def connect_primary(self):
client = self.connect(self.port)
primary_port = int(self.wait_primary(client).split(":")[1])
return self.connect(primary_port)
def wait_primary(self, client):
while True:
try:
primary = client['admin'].command({'isMaster': 1}).get('primary', None)
if primary:
return primary
except pymongo.errors.OperationFailure as e:
pass
time.sleep(0.1)
def start_replset(self):
client = self.connect(self.port)
config = {
'_id': self.rs_name,
'members': []
}
for i in xrange(3):
config['members'].append({'_id': i, 'host': "127.0.0.1:%d" % (self.port + i)})
client['admin'].command({'replSetInitiate': config})
logging.info("Waiting for the replset to initialize...")
self.wait_primary(client)
while True:
status = client['admin'].command({'replSetGetStatus': 1})
states = [m['state'] for m in status['members']]
if set(states) == set([1,2]):
return
time.sleep(1)
def cleanup(self):
for p in self.processes:
p.kill()
shutil.rmtree(self.tempdir)
@contextmanager
def replset(*args, **kwargs):
try:
rs = ReplSet(*args, **kwargs)
rs.start_mongos()
rs.start_replset()
yield rs
finally:
rs.cleanup()
def add_mongod_options(parser):
parser.add_option('--port',
help='Base port to run mongod servers on',
action='store',
type='int',
default=9000)
parser.add_option('--mongod',
help='Path to mongod',
action='store',
type='string',
default='/usr/bin/mongod')
parser.add_option('-v',
dest='verbose',
help='Make the mongod more verbose',
action='count',
default=0)
import os
import sys
import optparse
import time
import logging
from contextlib import contextmanager
import pymongo
import mongo
class FailoverTest(object):
def parse_args(self, args):
parser = optparse.OptionParser("Usage: %prog [args]")
mongo.add_mongod_options(parser)
parser.add_option('--force-sync',
dest='force_sync',
help='Perform a large number of writes immediately before syncing',
action='store_true',
default=False)
(self.options, args) = parser.parse_args(args)
def __init__(self, args):
self.parse_args(args)
def test_failover(self):
client = self.mongo.connect_primary()
primary = client['admin'].command({'isMaster': 1})['primary']
if self.options.force_sync:
for i in xrange(100):
client['test']['test-failover'].insert({"i": i}, w=0)
start = time.time()
try:
client['admin'].command({'replSetStepDown': 1})
except pymongo.errors.ConnectionFailure as e:
print "Primary stepped down..."
except pymongo.errors.OperationFailure as e:
print "Couldn't fail over: %s" % (e,)
return
new_primary = self.mongo.wait_primary(client)
end = time.time()
print "Failover (%s -> %s) in %.2fs." % (primary, new_primary, end - start)
def run(self):
with mongo.replset(mongod=self.options.mongod,
port=self.options.port,
verbose=self.options.verbose) as self.mongo:
while True:
client = self.mongo.connect_primary()
for i in xrange(30):
client['test']['test_failover'].insert({"heartbeat": time.time()})
time.sleep(1)
self.test_failover()
if __name__ == '__main__':
FailoverTest(sys.argv).run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment