Created
February 16, 2016 15:10
-
-
Save JulienPalard/b2c52f9d301ce0da586c to your computer and use it in GitHub Desktop.
Not-so-good example of mail header / body parsing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import sys | |
from email.parser import Parser | |
from email.header import decode_header | |
import logging | |
logger = logging.getLogger(__name__) | |
def setup_logger(logger, level, stdout=False): | |
if stdout: | |
handler = logging.StreamHandler() | |
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)s: ' | |
'%(message)s')) | |
else: | |
import logging.handlers | |
handler = logging.handlers.SysLogHandler(address='/dev/log') | |
handler.setFormatter(logging.Formatter('%(filename)s %(levelname)s: ' | |
'%(message)s')) | |
logger.addHandler(handler) | |
logger.setLevel(level) | |
setup_logger(logger, logging.DEBUG) | |
""" | |
Le : , @ : | |
On : , @ : | |
2013-10-31 11:22 GMT+01:00 @ : | |
""" | |
quotation_regexs = (r'\n(Le|On)[^:]+:[^,]+,[^@]+@[^:]+:\n\n>', | |
r'\n[0-9-]+ [0-9:A-Z+-]+[^@]+@[^:]+:\n\n>') | |
def get_message_in_body(body): | |
if body is None: | |
return '' | |
# Remove carriage return: | |
body = body.replace('\r', '') | |
# Remove quotations header in body: | |
for quotation_regex in quotation_regexs: | |
body = re.sub(quotation_regex, '\n>', body, flags=re.M) | |
# Remove every quotations: | |
body = re.sub('^>.*$', '', body, flags=re.M) | |
# Remove signature: | |
body = body.split('\n-- \n')[0] | |
# Deduplicate new lines: | |
body = re.sub('\n+', '\n', body) | |
return body | |
def try_decode_header(header): | |
""" | |
Try to decode a header, from: | |
=?UTF-8?Q?Conseils_d=27utilisation_de_votre_nouvelle_bo=C3=AEte_de_r?= | |
to: | |
Conseils d'utilisation de votre nouvelle boîte de réception | |
""" | |
if header is None: | |
return '' | |
try: | |
header, encoding = decode_header(header)[0] | |
except Exception as ex: | |
print(ex, file=sys.stderr) | |
if encoding is None: | |
encoding = 'utf-8' | |
if type(header) is bytes: | |
return header.decode(encoding) | |
return header | |
def get_attachments(mail): | |
if mail.is_multipart(): | |
for payload in mail.get_payload(): | |
if 'multipart/alternative' in payload.get('Content-Type', ''): | |
for attachement in get_attachments(payload): | |
yield attachement | |
if 'image/' in payload.get('Content-Type', ''): | |
params = dict(payload.get_params()) | |
mime = payload.get_params()[0][0] | |
yield (mime, try_decode_header(params.get('name')), | |
payload.get_payload(decode=True)) | |
def get_text_body(mail): | |
if mail.is_multipart(): | |
for payload in mail.get_payload(): | |
if 'multipart/alternative' in payload.get('Content-Type', ''): | |
return get_text_body(payload) | |
if 'text/plain' in payload.get('Content-Type', ''): | |
decoded_payload = payload.get_payload(decode=True) | |
try: | |
charset = 'UTF-8' | |
if 'charset=' in payload['Content-Type']: | |
charset = payload['Content-Type'].split('charset=')[1] | |
return decoded_payload.decode(charset) | |
except UnicodeDecodeError: | |
print("Failed to decode a mail with Content-Type:" + | |
payload['Content-Type']) | |
raise | |
else: | |
try: | |
return mail.get_payload(decode=True).decode('utf-8') | |
except UnicodeDecodeError: | |
return mail.get_payload(decode=True).decode('latin-1') | |
ticket = Parser().parse(sys.stdin) | |
subject = try_decode_header(ticket['Subject']).strip() | |
sender = try_decode_header(ticket['From']) | |
body = get_message_in_body(get_text_body(ticket)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment