Skip to content

Instantly share code, notes, and snippets.

@pszemraj
Created March 13, 2024 01:53
Show Gist options
  • Save pszemraj/9f57fd3fbe61165f1d3edd69f7550d69 to your computer and use it in GitHub Desktop.
Save pszemraj/9f57fd3fbe61165f1d3edd69f7550d69 to your computer and use it in GitHub Desktop.
parse directory of .eml files to a text dataframe, save to parquet
import logging
from email.parser import BytesParser
from pathlib import Path
import fire
import html2text
import pandas as pd
from tqdm import tqdm
# Setup logging
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
def extract_email_data(file_path):
"""Extract email data from a file.
This function reads an email file, parses its content and returns a
dictionary with the following keys:
* date: email date
* from_name: sender name
* from_address: sender email address
* to: recipients
* subject: email subject
* cc: carbon copy recipients
* bcc: blind carbon copy recipients
* message: email body
Parameters
----------
file_path : str
Path to the email file to parse.
Returns
-------
email_data : dict
Dictionary with email data.
"""
with open(file_path, "rb") as f:
msg = BytesParser().parse(f)
# Extract email body
body = ""
if msg.is_multipart():
# Walk through all parts of a multipart email
for part in msg.walk():
# Get the content type of the current part
ctype = part.get_content_type()
# Get the content disposition of the current part
cdispo = str(part.get("Content-Disposition"))
# Check if the part is a text/plain and not an attachment
if ctype == "text/plain" and "attachment" not in cdispo:
# If it is, extract the payload (the message body)
body = part.get_payload(decode=True) # decode
break
# Otherwise, if the part is a text/html and not an attachment
elif ctype == "text/html" and "attachment" not in cdispo:
# Extract the HTML payload
html_body = part.get_payload(decode=True)
# Convert the HTML body to plain text
body = html2text.html2text(html_body.decode("utf-8", errors="ignore"))
break
else:
# If the email is not multipart, extract the payload
ctype = msg.get_content_type()
# Check if the content type is text/plain
if ctype == "text/plain":
# If it is, extract the payload (the message body)
body = msg.get_payload(decode=True)
# Otherwise, if the content type is text/html
elif ctype == "text/html":
# Extract the HTML payload
html_body = msg.get_payload(decode=True)
# Convert the HTML body to plain text
body = html2text.html2text(html_body.decode("utf-8", errors="ignore"))
# Decode the body if it is bytes
body = body.decode("utf-8", errors="ignore") if isinstance(body, bytes) else body
# Return a dictionary with the email data
return {
"date": msg["Date"],
"from_name": msg["From"].split("<")[0].strip(),
"from_address": msg["From"].split("<")[-1].replace(">", "").strip(),
"to": msg["To"],
"subject": msg["Subject"],
"cc": msg.get("Cc", ""),
"bcc": msg.get("Bcc", ""),
"message": body,
}
def process_directory(input_dir, output_dir=None):
"""Process all .eml files in the given directory."""
input_path = Path(input_dir)
if not output_dir:
output_dir = input_path / "output"
output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True, parents=True)
logging.info(f"Processing emails in {input_dir}")
email_files = list(input_path.glob("*.eml"))
data = []
for file in tqdm(email_files, desc="Processing"):
try:
email_data = extract_email_data(file)
data.append(email_data)
except Exception as e:
logging.error(f"Failed to process {file.name}: {e}")
df = pd.DataFrame(data).convert_dtypes()
output_file = output_dir / "emails.parquet"
df.to_parquet(output_file, index=False)
logging.info(df.info())
logging.info(f"Data saved to {output_file}")
def main(input_dir, output_dir=None):
"""Main function to process emails and save to a Parquet file."""
process_directory(input_dir, output_dir)
if __name__ == "__main__":
fire.Fire(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment