Skip to content

Instantly share code, notes, and snippets.

@reagle
Last active February 20, 2023 15:54
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save reagle/1fe5e496ed7c772b68aa60c4d827abc9 to your computer and use it in GitHub Desktop.
Save reagle/1fe5e496ed7c772b68aa60c4d827abc9 to your computer and use it in GitHub Desktop.
Pretty print a mailbox, since some previous date, as a simple HTML file
#!/usr/bin/env python3
"""Pretty print a mailbox, since some previous date, as a simple HTML file"""
import argparse # http://docs.python.org/dev/library/argparse.html
# http://docs.python.org/lib/module-email.Utils.html
# from email.utils import parsedate
import email.parser
import html
import logging
import mailbox
import os
import re
import subprocess
import sys
import textwrap
import urllib
from datetime import datetime # https://docs.python.org/3/library/datetime
from pathlib import Path # https://docs.python.org/3/library/pathlib.html
from dateutil import relativedelta as rd
from dateutil.parser import parse
from dateutil.tz import tzlocal # https://dateutil.readthedocs.io/en/stable/
import markup # https://tylerbakke.github.io/MarkupPy/
HOME = str(Path("~").expanduser())
debug = logging.debug
info = logging.info
warn = logging.warn
error = logging.error
critical = logging.critical
exception = logging.exception
# Email stuff #############################
def msgfactory(mbox_fp):
try:
return email.message_from_file(mbox_fp)
except email.Errors.MessageParseError:
# Don't return None since that will
# stop the mailbox iterator
return ""
def get_headers(msg):
subject = sender = msg_date = None
subject = html.escape(msg.get("subject"))
sender = html.escape(msg.get("from").split("@")[0] + ">")
msg_date = parse(msg.get("date"))
return subject, sender, msg_date
# Date stuff ##############################
INT2DAY = {
i: day for i, day in enumerate((rd.MO, rd.TU, rd.WE, rd.TH, rd.FR, rd.SA, rd.SU))
}
# DAY2INT = {v: k for k, v in INT2DAY.items()}
MO, TU, WE, TH, FR, SA, SU = INT2DAY.keys()
def get_previous_class(classes, today):
"""returns previous class by finding the scheduled class days
of the week before today, and selecting the max/latest.
>>> get_previous_class((TU, FR), MO)
FR
>>> get_previous_class((TU, FR), WE)
TU
>>> get_previous_class((TU, FR), FR)
TU
>>> get_previous_class((TU, FR), SA)
FR
"""
info(f" classes = '{str(classes)}' today = '{today}'")
# if today's day precedes this week's class days act as if
# I'm a week ahead -- this is simpler than moving classes back a week
if today <= sorted(classes)[0]:
today = SA
info(f" today adjusted ={today}")
previous_class = max(c for c in classes if c < today)
info(f" previous_class = {previous_class}")
return INT2DAY[previous_class]
def get_previous_class_date():
"""returns previous class in time format"""
now = datetime.now(tzlocal()) # could use (pytz.timezone('US/Eastern'))
now_day = now.weekday()
info(f"{now_day=}")
prev_class = get_previous_class((TU, FR), now_day)
info(f"{prev_class=}")
prev_class_date = now + rd.relativedelta( # since 6PM day of last class
hour=18, minute=0, second=0, weekday=prev_class(-1)
)
info(f"{prev_class_date=}")
return prev_class_date
# Pandoc wrapper ##########################
def mkd2html(mkd):
mkd_utf8 = mkd.encode("utf-8", "replace").strip()
p = subprocess.Popen(
[
"pandoc",
(
"--from=markdown+autolink_bare_uris"
"-blank_before_header-space_in_atx_header"
),
"--to=html",
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
html_result = p.communicate(mkd_utf8)[0].decode("utf-8", "replace")
return html_result
def html2mkd(html_src):
html_utf8 = html_src.encode("utf-8", "replace").strip()
p = subprocess.Popen(
[
"pandoc",
"--from=html-raw_html-native_divs-native_spans",
"--to=markdown_strict",
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
mkd = p.communicate(html_utf8)[0].decode("utf-8", "replace")
return mkd
# Textual tools ###########################
def unsafe_links(content):
"""remove annoying outlook link protection
>>> unsafe_links('<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fmoneyish.com%2Fish%2Fmillennials-are-killing-bar-soap%2F&amp;data=02%7C01%7Cj.reagle%40northeastern.edu%7C2ed70276476f460557e908d615d1d419%7Ca8eec281aaa34daeac9b9a398b9215e7%7C0%7C0%7C636720388297012848&amp;sdata=twrmHnYyqd0pf%2FH3wFIva4Vnwnmmxq1ajkOUtAUQtZ0%3D&amp;reserved=0>')
'<https://moneyish.com/ish/millennials-are-killing-bar-soap/>'
""" # noqa: E501
RE_SAFELINK = re.compile(
r"""
(https://na.*?\d\d\.safelinks.+\?url=) # MS link
(.+) # encoded URL
(&amp;data[^"'>\)\s]+) # closing cruft
""",
re.VERBOSE,
)
info("content = '%s'" % content)
for match in RE_SAFELINK.finditer(content):
info("match = '%s'" % (match))
safe_url = match.group(0)
info("safe_url = '%s'" % (safe_url))
encoded_url = match.groups()[1]
info("encoded_url = '%s'" % (encoded_url))
decoded_url = urllib.parse.unquote(encoded_url)
info("decoded_url = '%s'" % (decoded_url))
content = content.replace(safe_url, decoded_url)
info("new_content = '%s'" % content)
return content
def rewrap_text(content):
new_content = ""
lines = content.split("\n")
for line in lines:
if len(line) > 80:
line = textwrap.fill(line)
new_content += "\n" + line
return "\n~~~\n" + new_content + "\n~~~\n" # in fenced code block?
def dedent_content(content):
new_content = re.sub(r"\n\s+", r"\n\n", content)
return new_content
def convert(filename, args):
info(f"{filename=}")
if filename.endswith("/"):
filename = filename[0:-1]
base_name = os.path.basename(filename)
base_name = base_name if not base_name.startswith(".") else base_name[1:]
info(f"{base_name=}")
html_fn = os.path.join(args.output_dir, base_name + "-responses.html")
info(f"{args.output_dir=}; {html_fn=}")
html_fp = open(html_fn, "w")
section_number = os.path.basename(filename)
# mbox = mailbox.Maildir(filename, factory=mailbox.MaildirMessage)
mbox = mailbox.mbox(filename, factory=mailbox.MaildirMessage)
prev_class_date = get_previous_class_date()
page = markup.page()
page.init(
title="Student Responses %s" % section_number,
css="https://reagle.org/joseph/2005/01/responses.css",
charset="utf-8",
)
page.h1(section_number)
relevant_msgs = []
for msg in mbox:
subject, sender, msg_date = get_headers(msg)
if args.all or msg_date > prev_class_date:
info("sender = '%s'" % sender)
name = sender.rsplit(" ", 1)[0] # remove email address
if len(name.split(" ")) > 1:
last_name = name.rsplit(" ", 1)[1]
if "bin" in name.lower():
last_name = "bin " + last_name
else:
last_name = name
info("last_name = '%s'" % last_name)
# added msg_date to sort on
relevant_msgs.append((msg_date, last_name, msg))
if args.date_sort:
relevant_msgs = sorted(relevant_msgs)
else: # sort on name
relevant_msgs = sorted(relevant_msgs, key=lambda msg: msg[1].lower())
for _, last_name, msg in relevant_msgs:
critical("\n")
critical("============================")
critical(f"{last_name=}")
subject, sender, msg_date = get_headers(msg)
sender_email = sender.rsplit(";")[1][0:-3]
page.div.open()
page.hr()
page.h1.open()
# page.a(e.p(sender, class_="sender"), href=f"#{sender_email}")
page.a("↪", class_="link", href=f"#{sender_email}")
page.span(sender, class_="sender", id=f"{sender_email}")
page.h1.close()
parts = {}
for part in msg.walk():
debug("--------")
debug(f"{part=}")
msg_content_type = part.get_content_subtype()
processed_as_type = part.get_content_subtype()
charset = "WINDOWS-1252"
if part.get_content_charset():
charset = part.get_content_charset()
debug(f"{charset=}")
if msg_content_type == "plain":
debug(f"part IS plain: {msg_content_type}")
processed_as_type += "+" + charset
content = part.get_payload(decode=True).decode(charset, "replace")
debug(f"{type(content)=}")
content = unsafe_links(content)
if content.startswith("<html>"):
content = html2mkd(content)
processed_as_type += "+html2mkd()"
if args.text:
content = rewrap_text(content)
processed_as_type += "+rewrap()"
# content = dedent_content(content)
debug(f"content = {content[0:250]}")
content = mkd2html(content)
# page.div(content, id=sender_email)
parts[msg_content_type] = content
# TODO 220119: continue and skip html if found markdown?
elif msg_content_type == "html":
debug("part is HTML: %s" % msg_content_type)
processed_as_type += "+html2mkd()"
content = part.get_payload(decode=True).decode(charset, "replace")
# convert to markdown to strip out junk
markdown = html2mkd(content)
debug(f"{markdown=}")
# then convert back to simple HTML
html_result = mkd2html(markdown)
debug(f"{html_result=}")
# page.div(html_result, id=sender_email)
# break # found HTML, which is okay, so move on
parts[msg_content_type] = content
else:
debug("part NOT plain: %s" % msg_content_type)
if msg_content_type == "msword" or msg_content_type == "octet-stream":
processed_as_type += "+doc"
debug(f"DOC {processed_as_type}")
command = "antiword"
elif (
part.get_content_subtype()
== "vnd.openxmlformats-officedocument.wordprocessingml.document"
):
command = "docx2txt.sh"
processed_as_type += "+docx"
debug(f"DOCX {processed_as_type}")
else:
debug("don't know type, try next part")
continue # don't know what it is, try next part
tmpf = "/tmp/mail-part-msw"
tmpft = "/tmp/mail-part-msw.txt"
os.system(f"/bin/rm {tmpf} {tmpft}")
# charset = part.get_content_charset()
# if not charset:
# breakpoint()
content = part.get_payload(decode=True).decode(charset, "replace")
tmpmbox_fp = open(tmpf, "w")
tmpmbox_fp.write(content) # must be string not bytes
tmpmbox_fp.close()
os.system(f"{command} {tmpf} > {tmpft}")
tmpftpt = open(tmpft)
content = " ".join(tmpftpt.readlines())
content = html.escape(content)
if args.text:
content = rewrap_text(content)
parts[msg_content_type] = content
# page.pre(content)
# page.p(('[processed as %s]' % processed_as_type))
info(f"{parts.keys()=}") # first preference
if "plain" in parts:
page.div(parts["plain"])
elif "html" in parts: # second preference
page.div(parts["html"])
else:
page.pre(parts[0]) # first of whatever is there
page.div.close()
html_fp.write(str(page))
html_fp.close()
def main(argv):
"""Process arguments"""
arg_parser = argparse.ArgumentParser(description="print HTML from mbox")
# positional arguments
arg_parser.add_argument("files", nargs="*", metavar="FILE")
# optional arguments
arg_parser.add_argument(
"-a",
"--all",
action="store_true",
default=False,
help="print all messages irrespective of date",
)
arg_parser.add_argument(
"-t",
"--text",
action="store_true",
default=False,
help="text (unformatted) rather than markdown",
)
arg_parser.add_argument(
"-d",
"--date-sort",
action="store_true",
default=False,
help="sort by date",
)
arg_parser.add_argument(
"-o",
"--output_dir",
metavar="DIRECTORY",
# default processed in main arg processing
help="output directory",
)
arg_parser.add_argument(
"-L",
"--log-to-file",
action="store_true",
default=False,
help="log to file %(prog)s.log",
)
arg_parser.add_argument(
"-V",
"--verbose",
action="count",
default=0,
help="Increase verbosity (specify multiple times for more)",
)
arg_parser.add_argument("--version", action="version", version="TBD")
args = arg_parser.parse_args(argv)
log_level = 100 # default
if args.verbose >= 3:
log_level = logging.DEBUG # 10
elif args.verbose == 2:
log_level = logging.INFO # 20
elif args.verbose == 1:
log_level = logging.ERROR # 40
LOG_FORMAT = "%(levelno)s %(funcName).5s: %(message)s"
if args.log_to_file:
logging.basicConfig(
filename="mbx-pp.log",
filemode="w",
level=log_level,
format=LOG_FORMAT,
)
else:
logging.basicConfig(level=log_level, format=LOG_FORMAT)
return args
if "__main__" == __name__:
args = main(sys.argv[1:])
if not args.files:
args.files = [
f"{HOME}/data/tbird-reagle/Mail/Local/classes.sbd/neu-cda",
# f"{HOME}/data/tbird-reagle/Mail/Local/classes.sbd/neu-oc",
f"{HOME}/data/tbird-reagle/Mail/Local/classes.sbd/neu-pc",
]
if not args.output_dir:
args.output_dir = HOME + f"/joseph/{datetime.now().year}/"
for filename in args.files:
info(f"STARTING = {filename}")
convert(filename, args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment