Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import imaplib
import re
import random
from email.parser import BytesHeaderParser
from typing import List
import tqdm
class EmailMiner:
"""
Based on method/code described here: https://obem.be/2020/02/18/mining-my-mailbox-for-top-email-service-providers.html
"""
def __init__(self, hostname: str, username: str, password: str, port: int=imaplib.IMAP4_PORT, use_ssl: bool=True):
self.hostname = hostname
self.username = username
self.password = password
self.port = port
self.use_ssl = use_ssl
self.emails = None
def analyse(self, limit: int=None):
self.emails = self.download_emails(limit=limit)
processed = self.process_emails()
for hostname in processed:
print(hostname)
def download_emails(self, limit: int=None) -> List[str]:
"""
Download emails from IMAP server.
"""
# annoyingly, rather than just passing a flag, we need to use a different class for SSL.
imap_class = imaplib.IMAP4_SSL if self.use_ssl else imaplib.IMAP4
with imap_class(host=self.hostname, port=self.port) as imap:
imap.login(self.username, self.password)
imap.select("INBOX")
_, data = imap.search(None, "ALL")
# split message_ids to get a list of messages in mailbox
all_message_ids = data[0].split(b" ")
# if we have specified a limit, then randomly sample n message IDs. this will fail if limit > message count
if limit:
message_ids = random.sample(all_message_ids, limit)
else:
message_ids = all_message_ids
messages = []
for mid in tqdm.tqdm(message_ids):
_, data = imap.fetch(mid, "(RFC822)")
# i'm not even going to pretend to understand the structure of the IMAP response... but these indices work for email
message = data[0][1]
messages.append(message)
return messages
def process_emails(self):
hostnames = []
parser = BytesHeaderParser()
for email in self.emails:
parsed = parser.parsebytes(email)
recieved_headers = parsed.get_all("Received")
relevant_hostname = None
if not recieved_headers: # this is null for some emails? perhaps ones sent by google?
continue
# according to the original blog post, if there's >1 received header, we should examine the second.
if len(recieved_headers) > 1:
second_header = recieved_headers[1] # get the second header
match = re.match(r"by (?P<host>[^\s]*)", second_header)
if match:
relevant_hostname = match.group("host")
if relevant_hostname is None:
first_header = recieved_headers[0]
match = re.match(r"EHLO (?P<host>[^)]*)", first_header)
if match:
print("got one!")
relevant_hostname = match.group("host")
if relevant_hostname:
hostnames.append(relevant_hostname)
return hostnames
if __name__ == "__main__":
from credentials import HOSTNAME, USERNAME, PASSWORD, PORT
miner = EmailMiner(HOSTNAME, USERNAME, PASSWORD, PORT, use_ssl=True)
miner.analyse(limit=1000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment