Created
March 27, 2012 03:23
-
-
Save ajdavis/2212215 to your computer and use it in GitHub Desktop.
Testing connection behavior in PyMongo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import shutil | |
import threading | |
import sys | |
import time | |
import pymongo | |
import re | |
# Two steps: 1. run the test. 2. parse the log. | |
###### 1. RUN THE TEST ########### | |
logpath = 'log' | |
datapath = 'data' | |
port = 6543 | |
number_of_threads = 40 | |
thread_lifetime_seconds = 20 | |
print 'deleting', logpath, 'and', datapath | |
if os.path.exists(logpath): | |
os.remove(logpath) | |
if os.path.exists(datapath): | |
shutil.rmtree(datapath, ignore_errors=True) | |
os.makedirs(datapath) | |
command = '/usr/local/mongo/bin/mongod --dbpath %s --logpath %s -v --fork --port %s' % ( | |
datapath, logpath, port | |
) | |
print command | |
os.system(command) | |
auto_start_request = '--auto_start_request' in sys.argv | |
# Try to connect for 10 seconds | |
connect_start = time.time() | |
while time.time() - connect_start < 10: | |
try: | |
if pymongo.get_version_string() == '2.1.1+': | |
cx = pymongo.Connection(port=port, auto_start_request=auto_start_request) | |
else: | |
# No auto_start_request argument | |
cx = pymongo.Connection(port=port) | |
break | |
except Exception: | |
# mongod not up yet, keep trying | |
pass | |
else: | |
raise Exception("Couldn't connect") | |
if pymongo.get_version_string() == '2.1.1': | |
print 'started pymongo 2.1.1' | |
else: | |
print ' '.join([ | |
'Started', | |
pymongo.get_version_string(), | |
('with' if auto_start_request else 'without'), | |
'auto_start_request' | |
]) | |
cx.test.test_collection.drop() | |
# Make 100k of data to query | |
one_kilobyte = 'a' * 1024 | |
cx.test.test_collection.insert([{'a': one_kilobyte} for i in range(100)], safe=True) | |
# Discard the socket we used for inserting | |
cx.disconnect() | |
# Query for 10 seconds | |
def thread_run(): | |
start_time = time.time() | |
for i in range(thread_lifetime_seconds * 10): | |
assert len(list(cx.test.test_collection.find())) == 100 | |
# Targeting 10 requests per sec | |
elapsed = time.time() - start_time | |
if elapsed < i * 0.1: | |
time.sleep(i * 0.1 - elapsed) | |
now = time.time() | |
elapsed = now - start_time | |
total_elapsed = now - global_start | |
print 'thread elapsed', elapsed, 'total elapsed', total_elapsed | |
# For 20 seconds, start 1 thread per sec, each of which runs for 10 seconds | |
# and does 100 finds | |
threads = [] | |
global_start = time.time() | |
for thread_index in range(number_of_threads): | |
t = threading.Thread(target=thread_run) | |
t.start() | |
threads.append(t) | |
time.sleep(1) | |
for t in threads: | |
t.join() | |
cx.disconnect() | |
# Let Mongo log disconnects | |
time.sleep(5) | |
pid = int(open(os.path.join(datapath, 'mongod.lock')).read()) | |
print 'Done after', time.time() - global_start, 'seconds, killing mongod with pid', pid | |
os.kill(pid, 9) | |
##### 2. PARSE MONGOD LOG ######## | |
log = open(logpath).read() | |
connections_started = 0 | |
open_connections = set() | |
first_line = log.split('\n')[0] | |
time_pat = re.compile('.* (\d\d:\d\d:\d\d) .*') | |
# Seconds since midnight | |
def get_seconds(line): | |
match = time_pat.match(line) | |
time_parts = map(int, match.groups(0)[0].split(':')) | |
return ((time_parts[0] * 60) + time_parts[1]) * 60 + time_parts[2] | |
# map: second -> (open connections, connections started) | |
seconds = {} | |
first_seconds = get_seconds(first_line) | |
print 'seconds,open connections,total connections opened' | |
# Parse each line in the log for opened and closed connections, store the last | |
# sample for each second into the 'seconds' dict | |
for line in log.split('\n'): | |
match = re.match(r'.*\[initandlisten\] connection accepted from [0-9\.:]+ #(\d+)', line) | |
if match: | |
cx_id = match.groups(0)[0] | |
connections_started += 1 | |
open_connections.add(cx_id) | |
match = re.match(r'.*\[conn(\d+)\] end connection .*', line) | |
if match: | |
cx_id = match.groups(0)[0] | |
open_connections.discard(cx_id) | |
try: | |
second = get_seconds(line) | |
except AttributeError: | |
# No match | |
pass | |
else: | |
seconds[second - first_seconds] = (len(open_connections), connections_started) | |
# For each second, find the most recent sample | |
for second in range(max(seconds)+1): | |
opn, started = None, None | |
# Start at the current second and search backward for the last sample | |
for s in reversed(range(second+1)): | |
try: | |
opn, started = seconds[s] | |
except KeyError: | |
continue | |
else: | |
break | |
print '%s,%s,%s' % (second, opn, started) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment