Skip to content

Instantly share code, notes, and snippets.

@JulienPalard
Created February 16, 2016 15:10
Show Gist options
  • Save JulienPalard/b2c52f9d301ce0da586c to your computer and use it in GitHub Desktop.
Save JulienPalard/b2c52f9d301ce0da586c to your computer and use it in GitHub Desktop.
Not-so-good example of mail header / body parsing
import os
import re
import sys
from email.parser import Parser
from email.header import decode_header
import logging
logger = logging.getLogger(__name__)
def setup_logger(logger, level, stdout=False):
if stdout:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)s: '
'%(message)s'))
else:
import logging.handlers
handler = logging.handlers.SysLogHandler(address='/dev/log')
handler.setFormatter(logging.Formatter('%(filename)s %(levelname)s: '
'%(message)s'))
logger.addHandler(handler)
logger.setLevel(level)
setup_logger(logger, logging.DEBUG)
"""
Le : , @ :
On : , @ :
2013-10-31 11:22 GMT+01:00 @ :
"""
quotation_regexs = (r'\n(Le|On)[^:]+:[^,]+,[^@]+@[^:]+:\n\n>',
r'\n[0-9-]+ [0-9:A-Z+-]+[^@]+@[^:]+:\n\n>')
def get_message_in_body(body):
if body is None:
return ''
# Remove carriage return:
body = body.replace('\r', '')
# Remove quotations header in body:
for quotation_regex in quotation_regexs:
body = re.sub(quotation_regex, '\n>', body, flags=re.M)
# Remove every quotations:
body = re.sub('^>.*$', '', body, flags=re.M)
# Remove signature:
body = body.split('\n-- \n')[0]
# Deduplicate new lines:
body = re.sub('\n+', '\n', body)
return body
def try_decode_header(header):
"""
Try to decode a header, from:
=?UTF-8?Q?Conseils_d=27utilisation_de_votre_nouvelle_bo=C3=AEte_de_r?=
to:
Conseils d'utilisation de votre nouvelle boîte de réception
"""
if header is None:
return ''
try:
header, encoding = decode_header(header)[0]
except Exception as ex:
print(ex, file=sys.stderr)
if encoding is None:
encoding = 'utf-8'
if type(header) is bytes:
return header.decode(encoding)
return header
def get_attachments(mail):
if mail.is_multipart():
for payload in mail.get_payload():
if 'multipart/alternative' in payload.get('Content-Type', ''):
for attachement in get_attachments(payload):
yield attachement
if 'image/' in payload.get('Content-Type', ''):
params = dict(payload.get_params())
mime = payload.get_params()[0][0]
yield (mime, try_decode_header(params.get('name')),
payload.get_payload(decode=True))
def get_text_body(mail):
if mail.is_multipart():
for payload in mail.get_payload():
if 'multipart/alternative' in payload.get('Content-Type', ''):
return get_text_body(payload)
if 'text/plain' in payload.get('Content-Type', ''):
decoded_payload = payload.get_payload(decode=True)
try:
charset = 'UTF-8'
if 'charset=' in payload['Content-Type']:
charset = payload['Content-Type'].split('charset=')[1]
return decoded_payload.decode(charset)
except UnicodeDecodeError:
print("Failed to decode a mail with Content-Type:" +
payload['Content-Type'])
raise
else:
try:
return mail.get_payload(decode=True).decode('utf-8')
except UnicodeDecodeError:
return mail.get_payload(decode=True).decode('latin-1')
ticket = Parser().parse(sys.stdin)
subject = try_decode_header(ticket['Subject']).strip()
sender = try_decode_header(ticket['From'])
body = get_message_in_body(get_text_body(ticket))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment