Last active
May 5, 2024 21:36
-
-
Save HandcartCactus/3ce247dacbe1bd2c7fd1b65eac39556f to your computer and use it in GitHub Desktop.
Portable Pure Python .eml/HTML parser
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Portable Pure Python .eml/HTML parser (no dependencies, just download (modify) and run) | |
Current extraction paths: extracts data from Lyft ride reciept emails | |
```shell | |
python3 -m eml_html_parser <DIRECTORY WITH EML FILES> | |
``` | |
MIT License | |
""" | |
from argparse import ArgumentParser | |
from datetime import datetime, time | |
from glob import glob | |
from html.parser import HTMLParser | |
from typing import Callable, Optional | |
from typing import Dict, Optional, Tuple | |
import email | |
import os | |
import quopri | |
import re | |
def eml_filepath_to_html(eml_file_path: str) -> str: | |
"""Given the path of an `.eml` file, return the html content in the file.""" | |
# Read the .eml file | |
with open(eml_file_path, 'rb') as eml_file: | |
eml_data = eml_file.read() | |
# Parse the email | |
msg = email.message_from_bytes(eml_data) | |
# Iterate through email parts | |
html_content = "" | |
for part in msg.walk(): | |
if part.get_content_type() == "text/html": | |
# Decode the HTML content (if encoded) | |
charset = part.get_content_charset() or 'utf-8' | |
decoded= quopri.decodestring(part.get_payload()).decode(charset) | |
html_content += decoded | |
return html_content | |
class HtmlToDict(HTMLParser): | |
def __init__(self, extractions:Dict[str, Tuple[str, 'StringToValue']], verbose:bool=False): | |
"""An HTML Parser for extracting values out of html. | |
Implements a simple x-path alternative since xml.etree.ElementTree ET (allows X-Path) vomits on < tag />-type tags in html. | |
LXML or beautifulsoup would probably be better for this, but using them would violate the pure-python constraint. | |
Args: | |
extractions (Dict[str, Tuple[str, StringToValue]]): `{'htmlpath': ('dict_key', str_to_value)}`. For each value to extract, the path, value name, and the extractor which extracts the value. | |
verbose (bool, optional): If `True`, prints all discovered data along with the html path it was discovered at, so you can define an extraction dict. Defaults to False. | |
Example: | |
Finding html paths for your html data: | |
```python | |
>>> h2d = HtmlToDict(extractions={}, verbose=True) | |
>>> h2d.feed("<html> ... </html>") | |
.html.body...table...tr.span | |
Access Fee | |
.html.body...table....tr.span | |
$5.99 | |
... | |
``` | |
```python | |
>>> h2d = HtmlToDict(extractions={ | |
... '.html.body...table...tr.span': ('fee_name', Extractor()), | |
... '.html.body...table....tr.span': ('fee_cost', Extractor(transformer=lambda s: float(s[1:]))), | |
... }) | |
>>> h2d.feed("<html> ... </html>") | |
>>> h2d.extracted | |
{'fee_name':'Access Fee', 'fee_cost':5.99} | |
``` | |
""" | |
super().__init__() | |
self.path = '' | |
self.extractions = extractions | |
self.extracted = {} | |
self.verbose = verbose | |
def handle_startendtag(self, tag, attrs): | |
self.path += '.' | |
def handle_starttag(self, tag, attrs): | |
self.path += '.'+tag | |
def handle_endtag(self, tag): | |
if tag in self.path: | |
self.path = self.path[:self.path.rindex(tag)] | |
def handle_data(self, data): | |
if self.verbose and data.strip(): | |
print(self.path) | |
print(data.strip()) | |
if data.strip() and self.path in self.extractions: | |
key, extractor = self.extractions[self.path] | |
self.extracted[key] = extractor(data.strip()) | |
# helper class for extracting/parsing values out of text | |
class StringToValue: | |
def __init__(self, validator:Optional[Callable]=None, transformer:Optional[Callable]=None): | |
"""Turn strings into values with some basic validation, if desired. | |
Args: | |
validator (Optional[Callable], optional): An optional function which returns `True` if the string represenation of the data is valid, `False` otherwise. Defaults to no validation. | |
transformer (Optional[Callable], optional): An optional function which transforms the string into actual values. Defaults to returning the string as-is. | |
""" | |
self.validator = validator | |
self.transformer = transformer | |
def __call__(self, data:str): | |
"""Validate and extract a value from `data`.""" | |
# validate if desired | |
if self.validator is not None: | |
if not self.validator(data): | |
raise ValueError(f"The data '{data}' did not pass validation check.") | |
# extract value if desired | |
if self.transformer is not None: | |
data = self.transformer(data) | |
return data | |
######################################## | |
# Lyft Ride Reciept - specific code | |
######################################## | |
# value parsing functions | |
def parse_time(s:str) -> time: | |
hours, min_ampm = s.split(':') | |
minutes, ampm = min_ampm.split(' ') | |
hours = int(hours) | |
minutes = int(minutes) | |
add_hours = 12 if ampm=='PM' and hours != 12 else 0 | |
return time(hour=hours+add_hours, minute=minutes) | |
def parse_datetime(s:str) -> datetime: | |
return datetime.strptime(s, "%B %d, %Y AT %I:%M %p") | |
def parse_receipt_num(s:str) -> str: | |
return s.replace('Receipt ','') | |
# define extractors | |
dollar_amount = StringToValue(validator=lambda s: s.startswith('$'), transformer=lambda s: float(s[1:])) | |
time_value = StringToValue(validator=lambda s: re.match('[\d]{1,2}:[\d]{2} [AP]M',s) is not None, transformer=parse_time) | |
datetime_value = StringToValue(transformer=parse_datetime) | |
trip_distance = StringToValue(transformer=lambda s: float(s[11:s.index(',')-2])) | |
receipt_no = StringToValue(transformer=parse_receipt_num) | |
as_is = StringToValue() | |
LYFT_RECEIPT_EXTRACTIONS: Dict[str, Tuple[str, StringToValue]] = { | |
'.html..body..table.tr.td.table.tr.td...table..tr.td.table.tr.td.table.tr.td.table.tr.td.a.span': ('datetime', datetime_value), | |
'.html..body..table.tr.td.table.tr.td...table...tr.td.table.tr.td.table.tr.td.table.tr.td': ('trip_miles', trip_distance), | |
'.html..body..table.tr.td.table.tr.td...table...tr.td.table.tr.td.table.tr.td.table.tr..td': ('fare_dollars', dollar_amount), | |
'.html..body..table.tr.td.table.tr.td...table...tr.td.table.tr.td.table.tr.td.table..tr..td': ('other_dollars', dollar_amount), | |
'.html..body..table.tr.td.table.tr.td...table...tr.td.table.tr.td.table.tr.td.table...tr..td': ('tip_dollars', dollar_amount), | |
'.html..body..table.tr.td.table.tr.td...table.......tr.td.table..tr..td.strong': ('total', dollar_amount), | |
'.html..body..table.tr.td.table.tr.td...table................tr.td..table.tr.td..........table.tr.td': ('receipt_no', receipt_no), | |
'.html..body..table.tr.td.table.tr.td...table...........tr.td.table.tr.td.table.tr.td.table...tr..td.a': ('pickup', as_is), | |
'.html..body..table.tr.td.table.tr.td...table...........tr.td.table.tr.td.table.tr.td.table..tr..td.table.tr.td.a.span': ('pickup_time', time_value), | |
'.html..body..table.tr.td.table.tr.td...table...........tr.td.table.tr.td.table..tr.td.table...tr..td.a': ('dropoff', as_is), | |
'.html..body..table.tr.td.table.tr.td...table...........tr.td.table.tr.td.table..tr.td.table..tr..td.table.tr.td.a.span': ('dropoff_time', time_value), | |
} | |
######################################## | |
# general script | |
######################################## | |
parser = ArgumentParser('EML Data Extractor for Lyft Ride Reciepts') | |
parser.add_argument('data_dir', help="the root directory to start the recursive search for .eml files.") | |
def main(data_dir, extractions): | |
all_data = [] | |
glob_pattern = os.path.join(data_dir, '**/*.eml') | |
for email_fp in glob(glob_pattern, recursive=True): | |
try: | |
html_msg = eml_filepath_to_html(email_fp) | |
h2d = HtmlToDict(extractions=extractions) | |
h2d.feed(html_msg) | |
all_data.append(h2d.extracted) | |
except Exception as e: | |
print(e) | |
return all_data | |
if __name__ == '__main__': | |
args = parser.parse_args() | |
all_lyft_data = main(data_dir=args.data_dir, extractions=LYFT_RECEIPT_EXTRACTIONS) | |
# add your own data processing here | |
print("Lyft data:") | |
print(all_lyft_data) |
Author
HandcartCactus
commented
May 5, 2024
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment