Skip to content

Instantly share code, notes, and snippets.

@fferegrino
Created February 14, 2020 05:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fferegrino/4558ac5a58f724db3b6c3f627994d47c to your computer and use it in GitHub Desktop.
Save fferegrino/4558ac5a58f724db3b6c3f627994d47c to your computer and use it in GitHub Desktop.
Reading/parsing medium emails
from bs4 import BeautifulSoup
MEDIUM_URL = "https://medium.com/"
URL_LEN = len(MEDIUM_URL)
def parse(message, **kwargs):
soup = BeautifulSoup(message, "lxml")
main = soup.find("table", {"class":"email-fillWidth"})
digest = main.find("table", {"class":"email-digest"})
sections = digest.find_all("tr", recursive=False)
for section in sections:
[td] = section.findChildren("td", recursive=False)
section_title_div = td.find("div")
if section_title_div is None:
continue
section_title = section_title_div.text
article_tables = td.findChildren("table", recursive=False)
for article in article_tables:
post_title = article.find("div", {"class":"email-digestPostTitle--hero"}) or article.find("div", {"class":"email-digestPostTitle"})
post_subtitle = article.find("div", {"class":"email-digestPostSubtitle"})
post_url, _, _ = post_title.parent.find("a")["href"].partition("?")
anchors = article.find_all("a")
author = (None, None)
site = (None, None)
for anchor in anchors:
url = anchor["href"][URL_LEN:]
first, _, rest = url.partition("?")
if first.startswith("@"):
author = (anchor.text, first)
else:
site = (anchor.text, first)
members_only = article.find("img", {"class":"email-digestMemberOnlyStar"}) is not None
data = {
**kwargs,
"section_title": section_title,
"post_title": post_title.text,
"post_subtitle": post_subtitle.text if bool(post_subtitle) else None,
"post_url": post_url,
"author_name": author[0],
"author_handle": author[1],
"site_name": site[0],
"site_slug": site[1],
"members_only": members_only
}
yield data
from imapclient import IMAPClient
import quopri
import email
import csv
import os
from parser import parse
IMAP_SERVER = os.environ["IMAP_SERVER"]
EMAIL_ACCOUNT = os.environ["EMAIL_ACCOUNT"]
PASSWORD = os.environ["EMAIL_PASS"]
FOLDER = os.environ["FOLDER"]
def get_subject(subject):
subject_parts = []
subjects = email.header.decode_header(subject)
for content, encoding in subjects:
try:
subject_parts.append(content.decode(encoding or "utf8"))
except:
subject_parts.append(content)
return "".join(subject_parts)
# context manager ensures the session is cleaned up
with IMAPClient(host=IMAP_SERVER, use_uid=True) as client:
client.login(EMAIL_ACCOUNT, PASSWORD)
client.select_folder(FOLDER, readonly=True)
messages = client.search(['NOT', 'DELETED'])
response = client.fetch(messages, "RFC822")
with open("mails.csv", "w") as mails_csv:
fields = ["date", "to", "from", "subject",
"section_title", "post_title", "post_subtitle",
"post_url", "author_name", "author_handle",
"site_name", "site_slug", "members_only"]
writer = csv.DictWriter(mails_csv, fieldnames=fields)
writer.writeheader()
for message_id, data in response.items():
email_message = email.message_from_bytes(data[b'RFC822'])
print(message_id, email_message.get('From'), get_subject(email_message.get('Subject')))
parts = { part.get_content_type(): part for part in email_message.get_payload() }
decoded = quopri.decodestring(parts["text/html"].get_payload()).decode("utf8")
extra_info = {
"to":email_message.get("To"),
"from":email_message.get("From"),
"subject":get_subject(email_message.get('Subject')),
"date":email_message.get("Date")
}
results = parse(decoded, **extra_info)
writer.writerows(results)
astroid==2.3.3
beautifulsoup4==4.8.2
IMAPClient==2.1.0
isort==4.3.21
lazy-object-proxy==1.4.3
lxml==4.5.0
mccabe==0.6.1
pylint==2.4.4
six==1.14.0
soupsieve==1.9.5
typed-ast==1.4.1
wrapt==1.11.2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment