Created
July 23, 2012 08:03
-
-
Save jrgm/3162498 to your computer and use it in GitHub Desktop.
log-smtp-rtt.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# This Source Code Form is subject to the terms of the Mozilla Public | |
# License, v. 2.0. If a copy of the MPL was not distributed with this | |
# file, You can obtain one at http://mozilla.org/MPL/2.0/. | |
# check delivery of stage_user email to http://restmail.net | |
import base64 | |
import hashlib | |
import json | |
import optparse | |
import os | |
import re | |
import requests | |
import smtplib | |
import sys | |
import time | |
from datetime import datetime | |
from email.mime.text import MIMEText | |
from email.utils import parsedate_tz, mktime_tz | |
class RestMail(object): | |
def __init__(self, username, hostname='restmail.net', https=False): | |
scheme = 'https' if https else 'http' | |
self.url = "%s://%s/mail/%s" % (scheme, hostname, username) | |
def get(self): | |
return requests.get(self.url) | |
def delete(self): | |
return requests.delete(self.url) | |
def random_alphanum(length=16): | |
alphanum = '' | |
while len(alphanum) < length: | |
bites = hashlib.sha512(os.urandom(64)) | |
alphanum += base64.b64encode(bites.digest()) | |
alphanum = re.sub(u'\+|=|\/', '', alphanum) | |
return alphanum[:length] | |
def check_stage_user_email(hostname='login.dev.anosrep.org', timeout=70.0): | |
(username, password) = (random_alphanum(), random_alphanum()) | |
def log_message(msg, extra=None): | |
fmt = '%Y-%m-%dT%H:%M:%S.' | |
now = time.time() | |
secs = datetime.utcfromtimestamp(int(now)).strftime(fmt) | |
now = secs + '%03dZ' % (1000 * (now % 1)) | |
msg = "[%s] (%s) (%s): %s" % (now, username, hostname, msg) | |
print msg | |
if extra: | |
f = open('/home/ec2-user/log-smtp-rtt.verbose', 'a') | |
f.write(msg + '\n' + extra + '\n') | |
f.close() | |
log_message('*** started') | |
session = requests.session() | |
res = session.get('https://%s/wsapi/session_context' % hostname) | |
if res.status_code != 200: | |
msg = "Bad response session_context: %s" % res.status_code | |
log_message(msg) | |
sys.exit(1) | |
ctx = json.loads(res.text) | |
post_data = { | |
'csrf': ctx['csrf_token'], | |
'email': '%s@restmail.net' % username, | |
'pass': password, | |
'site': 'http://example.com' | |
} | |
headers = { | |
'Content-Type': 'application/json', | |
} | |
# start the clock... | |
time_sent = time.time() | |
res = session.post('https://%s/wsapi/stage_user' % hostname, | |
data=json.dumps(post_data), headers=headers) | |
if res.status_code != 200: | |
log_message("Bad response stage_user: %s on /wsapi/stage_user" % | |
res.status_code) | |
sys.exit() | |
elapsed = 0 | |
sleep_time = 1.0 # 0.5 | |
while time.time() - time_sent < timeout: | |
log_message('** waiting') | |
time.sleep(sleep_time) | |
sleep_time *= 1.0 # 1.8 | |
restmail = RestMail(username) | |
res = restmail.get() | |
if res.status_code == 200 and res.text != '[]': | |
log_message('** got 200 and non empty: ', res.text) | |
elapsed = time.time() - time_sent | |
elapsed_start = time.time() - time_started - elapsed | |
try: | |
message = json.loads(res.text)[0] # XXX so possible race here | |
sent_at = mktime_tz(parsedate_tz(message['headers']['date'])) | |
if abs(time_sent - sent_at) > 2.0: | |
msg = ("The Date on the email makes no sense!: %s %s" % | |
(time_sent, sent_at)) | |
log_message(msg) | |
pattern = r'https://%s/verify_email_address' % hostname | |
if not re.search(pattern, message['text'], re.M): | |
msg = "Email found, but missing verify_email_address url" | |
log_message(msg) | |
if elapsed > 0: | |
# record overhead time and elapsed time from email sent | |
log_message("stage_user SMTP delivery took (ov:%.1fs) %.1fs" % (elapsed_start, elapsed)) | |
finally: | |
restmail.delete() | |
break | |
else: | |
log_message("stage_user SMTP delivery timeout: %.1fs" % timeout) | |
if __name__ == "__main__": | |
time_started = time.time() | |
desc = "check delivery of stage_user email to http://restmail.net" | |
parser = optparse.OptionParser(description=desc) | |
parser.add_option("-H", "--hostname", action="store", dest="hostname", | |
default="login.dev.anosrep.org", | |
help="hostname (default 'login.dev.anosrep.org')") | |
parser.add_option("-t", "--timeout", action="store", dest="timeout", | |
default=70, | |
help="Max. time to poll for email arrival (default 70s)") | |
(options, args) = parser.parse_args() | |
check_stage_user_email(hostname=options.hostname, timeout=options.timeout) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment