Last active
August 29, 2015 14:22
-
-
Save thusoy/8a59e24da6fda103dd5d to your computer and use it in GitHub Desktop.
Check IMAP inbox for mail transit encryption
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Count percentage of mail in IMAP inbox delivered over TLS, based on | |
presence of ESMTPS header. | |
""" | |
from collections import defaultdict | |
from contextlib import contextmanager | |
from email.parser import HeaderParser | |
from itertools import chain, islice | |
import argparse | |
import getpass | |
import imaplib | |
import re | |
import sys | |
import os | |
_by_re = re.compile(r'by ([^ ]*)') | |
_from_re = re.compile(r'^from ([^ ]*)') | |
def main(): | |
args = get_args() | |
insecure_hosts = defaultdict(int) | |
secure_hosts = defaultdict(int) | |
skip_count = 0 | |
with get_imap_connection(args.email_address, args.host) as conn: | |
for mail_header in get_inbox_headers(conn, args.limit): | |
transit_details = get_transit_details(mail_header, args.edges) | |
if transit_details: | |
sender, secure = transit_details | |
else: | |
skip_count += 1 | |
continue | |
if secure: | |
secure_hosts[sender] += 1 | |
else: | |
insecure_hosts[sender] += 1 | |
print_report(secure_hosts, insecure_hosts, skip_count) | |
def get_args(): | |
parser = argparse.ArgumentParser(description=__doc__) | |
parser.add_argument('email_address') | |
parser.add_argument('-l', '--limit', default=1000, help='How many mails to count. ' | |
'Default: %(default)s', type=int) | |
parser.add_argument('-i', '--imap-server', help='IMAP server to connect to. Default ' | |
'is the host part of the email address.') | |
parser.add_argument('-e', '--edge-servers', help='The SMTP server(s) considered to be the' | |
' edge, which receives the mail that has passed over the open Internet. We assume' | |
' internal networks are well-protected. Default: host part of email address.' | |
' You can separate hosts with a comma to specify multiple edges') | |
args = parser.parse_args() | |
mail_host = args.email_address[args.email_address.index('@')+1:] | |
args.host = args.imap_server or mail_host | |
args.edges = args.edge_servers.split(',') if args.edge_servers else mail_host | |
return args | |
def print_report(secure_hosts, insecure_hosts, skip_count): | |
for secure_host in sorted(secure_hosts): | |
print 'Secure: %s' % secure_host | |
for insecure_host, count in sorted(insecure_hosts.items(), key=lambda t: -t[1]): | |
print '%d non-confidential mail(s) from %s' % (count, insecure_host) | |
secure_count = sum(secure_hosts.values()) | |
insecure_count = sum(insecure_hosts.values()) | |
total_mail = secure_count + insecure_count | |
if total_mail: | |
percentage_secure = float(secure_count)/total_mail*100 | |
print '\nMail delivered confidentially: %.1f%%' % percentage_secure | |
if skip_count: | |
print '%d mails skipped (edge not found in headers)' % skip_count | |
@contextmanager | |
def get_imap_connection(email_address, host): | |
print 'Connecting to %s as %s...' % (host, email_address) | |
conn = imaplib.IMAP4_SSL(host) | |
password = os.environ.get('IMAP_PASSWORD') or getpass.getpass() | |
conn.login(email_address, password) | |
yield conn | |
conn.close() | |
conn.logout() | |
def get_transit_details(mail_header, transit_hop_targets): | |
for header, header_value in mail_header.items(): | |
if header == 'Received': | |
header_value = ' '.join(header_value.split()) | |
by_match = _by_re.search(header_value) | |
if not by_match: | |
continue | |
host = by_match.groups()[0] | |
if host and host in transit_hop_targets: | |
sender_match = _from_re.match(header_value) | |
sender = sender_match.groups()[0] | |
if sender in transit_hop_targets: | |
continue | |
secure = 'ESMTPS' in header_value | |
toplevel_sender = '.'.join(sender.split('.')[-2:]) | |
return toplevel_sender, secure | |
def get_inbox_headers(connection, limit, chunk_size=100): | |
connection.select() | |
status, raw_msg_ids = connection.search(None, 'ALL') | |
if status != 'OK': | |
print 'Fetching inbox failed: %s' % status | |
sys.exit(1) | |
# Iterate backwards, ie. newest first | |
all_msg_ids = raw_msg_ids[0].split()[-1:-1-limit:-1] | |
for msg_ids in chunked(all_msg_ids, chunk_size): | |
status, raw_header_chunk = connection.fetch(','.join(msg_ids), | |
'(BODY.PEEK[HEADER.FIELDS (RECEIVED)])') | |
if status != 'OK': | |
print 'Fetching headers of msgs %s failed' % msg_ids | |
sys.exit(1) | |
for header in parse_headers_from_chunk(raw_header_chunk): | |
yield header | |
def parse_headers_from_chunk(raw_header_chunk): | |
header_parser = HeaderParser() | |
for raw_header in raw_header_chunk: | |
if raw_header == ')': | |
continue # python or dovecot bug? | |
header_data = raw_header[1] | |
header = header_parser.parsestr(header_data) | |
yield header | |
def chunked(seq, chunk_size): | |
"""Yields items from an iterator in list chunks.""" | |
for chunk in _ichunked(seq, chunk_size): | |
yield list(chunk) | |
def _ichunked(sequence, chunk_size): | |
"""Yields items from an iterator in iterable chunks.""" | |
it = iter(sequence) | |
while True: | |
yield chain([it.next()], islice(it, chunk_size-1)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment