Created
February 17, 2011 07:08
-
-
Save mopemope/831202 to your computer and use it in GitHub Desktop.
use XOAUTH example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import imaplib | |
import base64 | |
import email | |
import re | |
from email import header as _header | |
#patch | |
ecre = re.compile(r''' | |
=\? # literal =? | |
(?P<charset>[^?]*?) # non-greedy up to the next ? is the charset | |
\? # literal ? | |
(?P<encoding>[qb]) # either a "q" or a "b", case insensitive | |
\? # literal ? | |
(?P<encoded>.*?) # non-greedy up to the next ?= is the encoded string | |
\?= # literal ?= | |
(?=[ \t]*|$) # whitespace or the end of the string | |
''', re.VERBOSE | re.IGNORECASE | re.MULTILINE) | |
_header.ecre = ecre | |
xoauth_string = "xxxxxxx" | |
def get_subject(mail_body): | |
subject_lst = [] | |
subjects = mail_body['Subject'] | |
if subjects: | |
subjects = _header.decode_header(subjects) | |
for sub, charset in subjects: | |
if charset: | |
subject_lst.append(unicode(sub, charset, 'ignore')) | |
else: | |
subject_lst.append(sub) | |
subject = u''.join(subject_lst) | |
return subject | |
def walk_mail(mail_body): | |
body = "" | |
for part in mail_body.walk(): | |
if part.get_content_maintype() == 'multipart': | |
continue | |
if not part.get_content_type() in ['text', 'text/plain']: | |
continue | |
filename = part.get_filename() | |
buf = part.get_payload(decode=True) | |
path = None | |
if not filename: | |
if buf: | |
charset = part.get_content_charset() | |
if charset : | |
if charset == "iso-9958-1": | |
charset = "iso-8859-1" | |
buf = unicode(buf, charset, 'ignore') | |
body = body + buf | |
return body | |
def parse_mail(msg): | |
mail = email.message_from_string(msg) | |
try: | |
subject = get_subject(mail) | |
body = walk_mail(mail) | |
print subject | |
print "=" * 100 | |
print body | |
print "=" * 100 | |
except: | |
print "*" * 80 | |
print mail | |
print "*" * 80 | |
raise | |
def TestImapAuthentication(imap_hostname, user, xoauth_string): | |
xoauth_key = base64.b64decode(xoauth_string) | |
imap_conn = imaplib.IMAP4_SSL(imap_hostname) | |
#imap_conn.debug = 4 | |
imap_conn.authenticate('XOAUTH', lambda x: xoauth_key) | |
retcode, l = imap_conn.list() | |
for a in l: | |
print a | |
imap_conn.select('INBOX') | |
typ, data = imap_conn.search(None, 'ALL') | |
for num in data[0].split(): | |
typ, data = imap_conn.fetch(num, '(RFC822)') | |
#print 'Message %s\n%s\n' % (num, data[0][1]) | |
parse_mail(data[0][1]) | |
print "parsed %s" % num | |
imap_conn.close() | |
imap_conn.logout() | |
TestImapAuthentication("imap.googlemail.com", "omeko@gmail.com", xoauth_string) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment