Skip to content

Instantly share code, notes, and snippets.

@JosephRedfern
Created February 18, 2020 19:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JosephRedfern/79314fdf1875166e08489746029851c2 to your computer and use it in GitHub Desktop.
Save JosephRedfern/79314fdf1875166e08489746029851c2 to your computer and use it in GitHub Desktop.
import imaplib
import re
import random
from email.parser import BytesHeaderParser
from typing import List
import tqdm
class EmailMiner:
"""
Based on method/code described here: https://obem.be/2020/02/18/mining-my-mailbox-for-top-email-service-providers.html
"""
def __init__(self, hostname: str, username: str, password: str, port: int=imaplib.IMAP4_PORT, use_ssl: bool=True):
self.hostname = hostname
self.username = username
self.password = password
self.port = port
self.use_ssl = use_ssl
self.emails = None
def analyse(self, limit: int=None):
self.emails = self.download_emails(limit=limit)
processed = self.process_emails()
for hostname in processed:
print(hostname)
def download_emails(self, limit: int=None) -> List[str]:
"""
Download emails from IMAP server.
"""
# annoyingly, rather than just passing a flag, we need to use a different class for SSL.
imap_class = imaplib.IMAP4_SSL if self.use_ssl else imaplib.IMAP4
with imap_class(host=self.hostname, port=self.port) as imap:
imap.login(self.username, self.password)
imap.select("INBOX")
_, data = imap.search(None, "ALL")
# split message_ids to get a list of messages in mailbox
all_message_ids = data[0].split(b" ")
# if we have specified a limit, then randomly sample n message IDs. this will fail if limit > message count
if limit:
message_ids = random.sample(all_message_ids, limit)
else:
message_ids = all_message_ids
messages = []
for mid in tqdm.tqdm(message_ids):
_, data = imap.fetch(mid, "(RFC822)")
# i'm not even going to pretend to understand the structure of the IMAP response... but these indices work for email
message = data[0][1]
messages.append(message)
return messages
def process_emails(self):
hostnames = []
parser = BytesHeaderParser()
for email in self.emails:
parsed = parser.parsebytes(email)
recieved_headers = parsed.get_all("Received")
relevant_hostname = None
if not recieved_headers: # this is null for some emails? perhaps ones sent by google?
continue
# according to the original blog post, if there's >1 received header, we should examine the second.
if len(recieved_headers) > 1:
second_header = recieved_headers[1] # get the second header
match = re.match(r"by (?P<host>[^\s]*)", second_header)
if match:
relevant_hostname = match.group("host")
if relevant_hostname is None:
first_header = recieved_headers[0]
match = re.match(r"EHLO (?P<host>[^)]*)", first_header)
if match:
print("got one!")
relevant_hostname = match.group("host")
if relevant_hostname:
hostnames.append(relevant_hostname)
return hostnames
if __name__ == "__main__":
from credentials import HOSTNAME, USERNAME, PASSWORD, PORT
miner = EmailMiner(HOSTNAME, USERNAME, PASSWORD, PORT, use_ssl=True)
miner.analyse(limit=1000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment