Skip to content

Instantly share code, notes, and snippets.

@thusoy
Last active August 29, 2015 14:22
Show Gist options
  • Save thusoy/8a59e24da6fda103dd5d to your computer and use it in GitHub Desktop.
Save thusoy/8a59e24da6fda103dd5d to your computer and use it in GitHub Desktop.
Check IMAP inbox for mail transit encryption
#!/usr/bin/env python
"""
Count percentage of mail in IMAP inbox delivered over TLS, based on
presence of ESMTPS header.
"""
from collections import defaultdict
from contextlib import contextmanager
from email.parser import HeaderParser
from itertools import chain, islice
import argparse
import getpass
import imaplib
import re
import sys
import os
_by_re = re.compile(r'by ([^ ]*)')
_from_re = re.compile(r'^from ([^ ]*)')
def main():
args = get_args()
insecure_hosts = defaultdict(int)
secure_hosts = defaultdict(int)
skip_count = 0
with get_imap_connection(args.email_address, args.host) as conn:
for mail_header in get_inbox_headers(conn, args.limit):
transit_details = get_transit_details(mail_header, args.edges)
if transit_details:
sender, secure = transit_details
else:
skip_count += 1
continue
if secure:
secure_hosts[sender] += 1
else:
insecure_hosts[sender] += 1
print_report(secure_hosts, insecure_hosts, skip_count)
def get_args():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('email_address')
parser.add_argument('-l', '--limit', default=1000, help='How many mails to count. '
'Default: %(default)s', type=int)
parser.add_argument('-i', '--imap-server', help='IMAP server to connect to. Default '
'is the host part of the email address.')
parser.add_argument('-e', '--edge-servers', help='The SMTP server(s) considered to be the'
' edge, which receives the mail that has passed over the open Internet. We assume'
' internal networks are well-protected. Default: host part of email address.'
' You can separate hosts with a comma to specify multiple edges')
args = parser.parse_args()
mail_host = args.email_address[args.email_address.index('@')+1:]
args.host = args.imap_server or mail_host
args.edges = args.edge_servers.split(',') if args.edge_servers else mail_host
return args
def print_report(secure_hosts, insecure_hosts, skip_count):
for secure_host in sorted(secure_hosts):
print 'Secure: %s' % secure_host
for insecure_host, count in sorted(insecure_hosts.items(), key=lambda t: -t[1]):
print '%d non-confidential mail(s) from %s' % (count, insecure_host)
secure_count = sum(secure_hosts.values())
insecure_count = sum(insecure_hosts.values())
total_mail = secure_count + insecure_count
if total_mail:
percentage_secure = float(secure_count)/total_mail*100
print '\nMail delivered confidentially: %.1f%%' % percentage_secure
if skip_count:
print '%d mails skipped (edge not found in headers)' % skip_count
@contextmanager
def get_imap_connection(email_address, host):
print 'Connecting to %s as %s...' % (host, email_address)
conn = imaplib.IMAP4_SSL(host)
password = os.environ.get('IMAP_PASSWORD') or getpass.getpass()
conn.login(email_address, password)
yield conn
conn.close()
conn.logout()
def get_transit_details(mail_header, transit_hop_targets):
for header, header_value in mail_header.items():
if header == 'Received':
header_value = ' '.join(header_value.split())
by_match = _by_re.search(header_value)
if not by_match:
continue
host = by_match.groups()[0]
if host and host in transit_hop_targets:
sender_match = _from_re.match(header_value)
sender = sender_match.groups()[0]
if sender in transit_hop_targets:
continue
secure = 'ESMTPS' in header_value
toplevel_sender = '.'.join(sender.split('.')[-2:])
return toplevel_sender, secure
def get_inbox_headers(connection, limit, chunk_size=100):
connection.select()
status, raw_msg_ids = connection.search(None, 'ALL')
if status != 'OK':
print 'Fetching inbox failed: %s' % status
sys.exit(1)
# Iterate backwards, ie. newest first
all_msg_ids = raw_msg_ids[0].split()[-1:-1-limit:-1]
for msg_ids in chunked(all_msg_ids, chunk_size):
status, raw_header_chunk = connection.fetch(','.join(msg_ids),
'(BODY.PEEK[HEADER.FIELDS (RECEIVED)])')
if status != 'OK':
print 'Fetching headers of msgs %s failed' % msg_ids
sys.exit(1)
for header in parse_headers_from_chunk(raw_header_chunk):
yield header
def parse_headers_from_chunk(raw_header_chunk):
header_parser = HeaderParser()
for raw_header in raw_header_chunk:
if raw_header == ')':
continue # python or dovecot bug?
header_data = raw_header[1]
header = header_parser.parsestr(header_data)
yield header
def chunked(seq, chunk_size):
"""Yields items from an iterator in list chunks."""
for chunk in _ichunked(seq, chunk_size):
yield list(chunk)
def _ichunked(sequence, chunk_size):
"""Yields items from an iterator in iterable chunks."""
it = iter(sequence)
while True:
yield chain([it.next()], islice(it, chunk_size-1))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment