Skip to content

Instantly share code, notes, and snippets.

@jrgm
Created July 23, 2012 08:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jrgm/3162498 to your computer and use it in GitHub Desktop.
Save jrgm/3162498 to your computer and use it in GitHub Desktop.
log-smtp-rtt.py
#!/usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# check delivery of stage_user email to http://restmail.net
import base64
import hashlib
import json
import optparse
import os
import re
import requests
import smtplib
import sys
import time
from datetime import datetime
from email.mime.text import MIMEText
from email.utils import parsedate_tz, mktime_tz
class RestMail(object):
def __init__(self, username, hostname='restmail.net', https=False):
scheme = 'https' if https else 'http'
self.url = "%s://%s/mail/%s" % (scheme, hostname, username)
def get(self):
return requests.get(self.url)
def delete(self):
return requests.delete(self.url)
def random_alphanum(length=16):
alphanum = ''
while len(alphanum) < length:
bites = hashlib.sha512(os.urandom(64))
alphanum += base64.b64encode(bites.digest())
alphanum = re.sub(u'\+|=|\/', '', alphanum)
return alphanum[:length]
def check_stage_user_email(hostname='login.dev.anosrep.org', timeout=70.0):
(username, password) = (random_alphanum(), random_alphanum())
def log_message(msg, extra=None):
fmt = '%Y-%m-%dT%H:%M:%S.'
now = time.time()
secs = datetime.utcfromtimestamp(int(now)).strftime(fmt)
now = secs + '%03dZ' % (1000 * (now % 1))
msg = "[%s] (%s) (%s): %s" % (now, username, hostname, msg)
print msg
if extra:
f = open('/home/ec2-user/log-smtp-rtt.verbose', 'a')
f.write(msg + '\n' + extra + '\n')
f.close()
log_message('*** started')
session = requests.session()
res = session.get('https://%s/wsapi/session_context' % hostname)
if res.status_code != 200:
msg = "Bad response session_context: %s" % res.status_code
log_message(msg)
sys.exit(1)
ctx = json.loads(res.text)
post_data = {
'csrf': ctx['csrf_token'],
'email': '%s@restmail.net' % username,
'pass': password,
'site': 'http://example.com'
}
headers = {
'Content-Type': 'application/json',
}
# start the clock...
time_sent = time.time()
res = session.post('https://%s/wsapi/stage_user' % hostname,
data=json.dumps(post_data), headers=headers)
if res.status_code != 200:
log_message("Bad response stage_user: %s on /wsapi/stage_user" %
res.status_code)
sys.exit()
elapsed = 0
sleep_time = 1.0 # 0.5
while time.time() - time_sent < timeout:
log_message('** waiting')
time.sleep(sleep_time)
sleep_time *= 1.0 # 1.8
restmail = RestMail(username)
res = restmail.get()
if res.status_code == 200 and res.text != '[]':
log_message('** got 200 and non empty: ', res.text)
elapsed = time.time() - time_sent
elapsed_start = time.time() - time_started - elapsed
try:
message = json.loads(res.text)[0] # XXX so possible race here
sent_at = mktime_tz(parsedate_tz(message['headers']['date']))
if abs(time_sent - sent_at) > 2.0:
msg = ("The Date on the email makes no sense!: %s %s" %
(time_sent, sent_at))
log_message(msg)
pattern = r'https://%s/verify_email_address' % hostname
if not re.search(pattern, message['text'], re.M):
msg = "Email found, but missing verify_email_address url"
log_message(msg)
if elapsed > 0:
# record overhead time and elapsed time from email sent
log_message("stage_user SMTP delivery took (ov:%.1fs) %.1fs" % (elapsed_start, elapsed))
finally:
restmail.delete()
break
else:
log_message("stage_user SMTP delivery timeout: %.1fs" % timeout)
if __name__ == "__main__":
time_started = time.time()
desc = "check delivery of stage_user email to http://restmail.net"
parser = optparse.OptionParser(description=desc)
parser.add_option("-H", "--hostname", action="store", dest="hostname",
default="login.dev.anosrep.org",
help="hostname (default 'login.dev.anosrep.org')")
parser.add_option("-t", "--timeout", action="store", dest="timeout",
default=70,
help="Max. time to poll for email arrival (default 70s)")
(options, args) = parser.parse_args()
check_stage_user_email(hostname=options.hostname, timeout=options.timeout)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment