Created
July 23, 2012 07:57
-
-
Save jrgm/3162482 to your computer and use it in GitHub Desktop.
log-smtp-direct-rtt.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# This Source Code Form is subject to the terms of the Mozilla Public | |
# License, v. 2.0. If a copy of the MPL was not distributed with this | |
# file, You can obtain one at http://mozilla.org/MPL/2.0/. | |
# check direct delivery email via localhost:25 to http://restmail.net | |
import base64 | |
import hashlib | |
import json | |
import optparse | |
import os | |
import re | |
import requests | |
import smtplib | |
import sys | |
import time | |
from datetime import datetime | |
from email.mime.text import MIMEText | |
from email.utils import parsedate_tz, mktime_tz | |
class RestMail(object): | |
def __init__(self, username, hostname='restmail.net', https=False): | |
scheme = 'https' if https else 'http' | |
self.url = "%s://%s/mail/%s" % (scheme, hostname, username) | |
def get(self): | |
return requests.get(self.url) | |
def delete(self): | |
return requests.delete(self.url) | |
def random_alphanum(length=16): | |
alphanum = '' | |
while len(alphanum) < length: | |
bites = hashlib.sha512(os.urandom(64)) | |
alphanum += base64.b64encode(bites.digest()) | |
alphanum = re.sub(u'\+|=|\/', '', alphanum) | |
return alphanum[:length] | |
def check_direct_email(hostname, timeout): | |
username = random_alphanum() | |
def log_message(msg, extra=None): | |
fmt = '%Y-%m-%dT%H:%M:%S.' | |
now = time.time() | |
secs = datetime.utcfromtimestamp(int(now)).strftime(fmt) | |
now = secs + '%03dZ' % (1000 * (now % 1)) | |
msg = "[%s] (%s) (%s): %s" % (now, username, hostname, msg) | |
print msg | |
if extra: | |
f = open('/home/ec2-user/log-smtp-direct-rtt.verbose', 'a') | |
f.write(msg + '\n' + extra + '\n') | |
f.close() | |
log_message('*** started') | |
recipient = '%s@restmail.net' % username | |
sender = 'nobody@127.0.0.1' | |
msg = MIMEText('This is a test message') | |
msg['Subject'] = 'This is a test message' | |
msg['From'] = sender | |
msg['To'] = recipient | |
# start the clock... | |
time_sent = time.time() | |
s = smtplib.SMTP('localhost') | |
s.sendmail(sender, [recipient], msg.as_string()) | |
s.quit() | |
elapsed = 0 | |
sleep_time = 1.0 # 0.5 | |
while time.time() - time_sent < timeout: | |
log_message('** waiting') | |
time.sleep(sleep_time) | |
sleep_time *= 1.0 # 1.8 | |
restmail = RestMail(username) | |
res = restmail.get() | |
if res.status_code == 200 and res.text != '[]': | |
log_message('** got 200 and non empty: ', res.text) | |
elapsed = time.time() - time_sent | |
elapsed_start = time.time() - time_started - elapsed | |
try: | |
message = json.loads(res.text)[0] # XXX so possible race here | |
sent_at = mktime_tz(parsedate_tz(message['headers']['date'])) | |
if abs(time_sent - sent_at) > 2.0: | |
msg = ("The Date on the email makes no sense!: %s %s" % | |
(time_sent, sent_at)) | |
log_message(msg) | |
pattern = r'This is a test message' | |
if not re.search(pattern, message['text'], re.M): | |
msg = "Email found, but the message does not match" | |
log_message(msg) | |
if elapsed > 0: | |
# record overhead time and elapsed time from email sent | |
log_message("direct SMTP delivery took (ov:%.1fs) %.1fs" % (elapsed_start, elapsed)) | |
finally: | |
restmail.delete() | |
break | |
else: | |
log_message("direct SMTP delivery timeout: %.1fs" % timeout) | |
if __name__ == "__main__": | |
time_started = time.time() | |
check_direct_email('login.dev.anosrep.org', 70) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment