Skip to content

Instantly share code, notes, and snippets.

@ajdavis
Created March 27, 2012 03:23
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ajdavis/2212215 to your computer and use it in GitHub Desktop.
Save ajdavis/2212215 to your computer and use it in GitHub Desktop.
Testing connection behavior in PyMongo
import os
import shutil
import threading
import sys
import time
import pymongo
import re
# Two steps: 1. run the test. 2. parse the log.
###### 1. RUN THE TEST ###########
logpath = 'log'
datapath = 'data'
port = 6543
number_of_threads = 40
thread_lifetime_seconds = 20
print 'deleting', logpath, 'and', datapath
if os.path.exists(logpath):
os.remove(logpath)
if os.path.exists(datapath):
shutil.rmtree(datapath, ignore_errors=True)
os.makedirs(datapath)
command = '/usr/local/mongo/bin/mongod --dbpath %s --logpath %s -v --fork --port %s' % (
datapath, logpath, port
)
print command
os.system(command)
auto_start_request = '--auto_start_request' in sys.argv
# Try to connect for 10 seconds
connect_start = time.time()
while time.time() - connect_start < 10:
try:
if pymongo.get_version_string() == '2.1.1+':
cx = pymongo.Connection(port=port, auto_start_request=auto_start_request)
else:
# No auto_start_request argument
cx = pymongo.Connection(port=port)
break
except Exception:
# mongod not up yet, keep trying
pass
else:
raise Exception("Couldn't connect")
if pymongo.get_version_string() == '2.1.1':
print 'started pymongo 2.1.1'
else:
print ' '.join([
'Started',
pymongo.get_version_string(),
('with' if auto_start_request else 'without'),
'auto_start_request'
])
cx.test.test_collection.drop()
# Make 100k of data to query
one_kilobyte = 'a' * 1024
cx.test.test_collection.insert([{'a': one_kilobyte} for i in range(100)], safe=True)
# Discard the socket we used for inserting
cx.disconnect()
# Query for 10 seconds
def thread_run():
start_time = time.time()
for i in range(thread_lifetime_seconds * 10):
assert len(list(cx.test.test_collection.find())) == 100
# Targeting 10 requests per sec
elapsed = time.time() - start_time
if elapsed < i * 0.1:
time.sleep(i * 0.1 - elapsed)
now = time.time()
elapsed = now - start_time
total_elapsed = now - global_start
print 'thread elapsed', elapsed, 'total elapsed', total_elapsed
# For 20 seconds, start 1 thread per sec, each of which runs for 10 seconds
# and does 100 finds
threads = []
global_start = time.time()
for thread_index in range(number_of_threads):
t = threading.Thread(target=thread_run)
t.start()
threads.append(t)
time.sleep(1)
for t in threads:
t.join()
cx.disconnect()
# Let Mongo log disconnects
time.sleep(5)
pid = int(open(os.path.join(datapath, 'mongod.lock')).read())
print 'Done after', time.time() - global_start, 'seconds, killing mongod with pid', pid
os.kill(pid, 9)
##### 2. PARSE MONGOD LOG ########
log = open(logpath).read()
connections_started = 0
open_connections = set()
first_line = log.split('\n')[0]
time_pat = re.compile('.* (\d\d:\d\d:\d\d) .*')
# Seconds since midnight
def get_seconds(line):
match = time_pat.match(line)
time_parts = map(int, match.groups(0)[0].split(':'))
return ((time_parts[0] * 60) + time_parts[1]) * 60 + time_parts[2]
# map: second -> (open connections, connections started)
seconds = {}
first_seconds = get_seconds(first_line)
print 'seconds,open connections,total connections opened'
# Parse each line in the log for opened and closed connections, store the last
# sample for each second into the 'seconds' dict
for line in log.split('\n'):
match = re.match(r'.*\[initandlisten\] connection accepted from [0-9\.:]+ #(\d+)', line)
if match:
cx_id = match.groups(0)[0]
connections_started += 1
open_connections.add(cx_id)
match = re.match(r'.*\[conn(\d+)\] end connection .*', line)
if match:
cx_id = match.groups(0)[0]
open_connections.discard(cx_id)
try:
second = get_seconds(line)
except AttributeError:
# No match
pass
else:
seconds[second - first_seconds] = (len(open_connections), connections_started)
# For each second, find the most recent sample
for second in range(max(seconds)+1):
opn, started = None, None
# Start at the current second and search backward for the last sample
for s in reversed(range(second+1)):
try:
opn, started = seconds[s]
except KeyError:
continue
else:
break
print '%s,%s,%s' % (second, opn, started)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment