Skip to content

Instantly share code, notes, and snippets.

@yszou
Created December 9, 2013 11:50
Show Gist options
  • Save yszou/7871128 to your computer and use it in GitHub Desktop.
Save yszou/7871128 to your computer and use it in GitHub Desktop.
邮件解析
# -*- coding: utf-8 -*-
import uuid
import base64
import datetime
import email
import email.Header
from email.utils import getaddresses, parsedate_tz, mktime_tz
from StringIO import StringIO
'EML解析'
def decode_header(s):
l = []
try:
dlist = email.Header.decode_header(s)
except email.errors.HeaderParseError:
return ''
else:
for x, y in dlist:
if y:
try:
x = x.decode(y)
except:
try:
x = x.decode('gb18030')
except:
x = x.decode('iso-8859-1')
else:
try:
x = x.decode('utf8')
except:
try:
x = x.decode('gb18030')
except:
x = x.decode('iso-8859-1')
l.append(x)
return ' '.join(l)
def get_transfer_encoding(self):
content_transfer_encoding = self.get('Content-Transfer-Encoding')
if not content_transfer_encoding:
return 'string_escape'
if content_transfer_encoding.lower().startswith('quoted'):
return 'quoted-printable'
if content_transfer_encoding.lower().startswith('base64'):
return 'base64'
return 'string_escape'
def _get_content(obj):
transfer = obj.get_transfer_encoding()
source = obj.get_payload()
if transfer == 'string_escape':
source = source.replace('\\', '\\\\')
try:
data = source.decode(transfer).decode(obj.get_content_charset('utf8'))
except: #自己都可以写错,我X
try:
data = source.decode(transfer).decode('gb18030')
except:
try:
data = source.decode(transfer).decode('iso-8859-1')
except:
data = source.decode('iso-8859-1')
return data.replace('\r', '')
def parse(s):
msg = email.message_from_string(s)
#嵌入一个方法以便使用
msg.__class__.get_transfer_encoding = get_transfer_encoding
m = {}
m['source'] = StringIO(s)
m['obj'] = msg
m['id'] = msg.get('Message-ID', '')
m['error_to'] = msg.get('Errors-To', '')
m['in_reply_to'] = msg.get('In-Reply-To', '')
m['reply_to'] = getaddresses(decode_header(msg.get('Reply-To', '')).split(','))
m['referer'] = msg.get('References', '')
date = msg.get('Date', None)
if date is None:
m['date'] = datetime.datetime.now()
else:
try:
m['date'] = datetime.datetime.fromtimestamp(mktime_tz(parsedate_tz(date)))
except OverflowError:
m['date'] = datetime.datetime.now()
m['from'] = [(x, y.lower()) for x, y in \
getaddresses(decode_header(msg.get('From', '')).split(','))]
m['to'] = [(x, y.lower()) for x, y in \
getaddresses(decode_header(msg.get('To', '')).split(','))]
m['cc'] = [(x, y.lower()) for x, y in \
getaddresses(decode_header(msg.get('CC', '')).split(','))]
m['bcc'] = [(x, y.lower()) for x, y in \
getaddresses(decode_header(msg.get('BCC', '')).split(','))]
m['list'] = [(x, y.lower()) for x, y in \
getaddresses(decode_header(msg.get('List-ID', '')).split(','))]
m['subject'] = decode_header(msg.get('Subject', ''))
#确保是unicode
if m['from']:
if not isinstance(m['from'][0][0], unicode):
try:
m['from'] = [(x.decode('utf8'), y.decode('utf8')) for x, y in m['from']]
except UnicodeDecodeError:
m['from'] = [(x.decode('gb18030'), y.decode('gb18030')) for x, y in m['from']]
except:
pass
if m['to']:
if not isinstance(m['to'][0][0], unicode):
try:
m['to'] = [(x.decode('utf8'), y.decode('utf8')) for x, y in m['to']]
except UnicodeDecodeError:
m['to'] = [(x.decode('gb18030'), y.decode('gb18030')) for x, y in m['to']]
except:
pass
if not isinstance(m['subject'], unicode):
try:
m['subject'] = m['subject'].decode('utf8')
except UnicodeDecodeError:
m['subject'] = m['subject'].decode('gb18030')
except:
pass
m['attachment'] = []
m['in_line'] = []
m['data'] = u''
m['plain'] = u''
m['is_html'] = False
if not msg.is_multipart(): #纯文本不带附件
m['data'] = StringIO(_get_content(msg))
m['plain'] = m['data']
else:
if msg.get('Content-Type', '').startswith('multipart/alternative'): #富文本不带附件,可能内嵌二进制
for part in msg.walk():
if part.is_multipart(): continue
if part.get('Content-Type', '').startswith('text/plain') and m['plain'] == u'': #纯文本
m['plain'] = StringIO(_get_content(part))
elif part.get('Content-Type', '').startswith('text/html') and m['data'] == u'': #富文本
m['data'] = StringIO(_get_content(part))
m['is_html'] = True
else: #其它内嵌内容
content_id = part.get('Content-ID', ' ')[1:-1]
content_type = part.get_content_type()
filename = uuid.uuid4().hex + '.' + part.get_content_subtype()
data = part.get_payload().decode(part.get_transfer_encoding())
m['in_line'].append([content_id, content_type, decode_header(filename), StringIO(data)])
else: #.startswith('multipart/mixed') #带附件,附件都是放在后面的
for part in msg.walk():
if part.is_multipart(): continue
if part.get('Content-Type', '').startswith('text/plain') and m['plain'] == u'': #纯文本
m['plain'] = StringIO(_get_content(part))
elif part.get('Content-Type', '').startswith('text/html') and m['data'] == u'': #富文本
m['data'] = StringIO(_get_content(part))
m['is_html'] = True
else: #其它内嵌内容,或者附件
if part.get('Content-Disposition', '').startswith('attachment'): #附件
content_id = ''
#取Content-Disposition下的filename不对
try:
filename = part.get('Content-Type', '').split('=', 1)[1][1:-1]
except IndexError:
filename = uuid.uuid4().hex
content_type = part.get_content_type()
data = part.get_payload().decode(part.get_transfer_encoding())
#附件名也注意要解码
m['attachment'].append([content_id, content_type, decode_header(filename), StringIO(data)])
else: #非附件
content_id = part.get('Content-ID', ' ')[1:-1]
content_type = part.get_content_type()
filename = uuid.uuid4().hex + '.' + part.get_content_subtype()
data = part.get_payload().decode(part.get_transfer_encoding())
#附件名也注意要解码
m['in_line'].append([content_id, content_type, decode_header(filename), StringIO(data)])
return m
if __name__ == '__main__':
from pprint import pprint
from clean import EMLCleaner
s = open('../data/db64220326f74f578ee3a9c4106026ff', 'rb').read()
msg = parse(s)
data = msg['data'].read()
clean = EMLCleaner()
data = clean.clean_html(data)
print >> open('demo.html', 'w'), data.encode('utf8')
#pprint(msg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment