Last active
November 9, 2016 23:10
-
-
Save vane/b6c47d848a2fc991de12ac7e7274c635 to your computer and use it in GitHub Desktop.
concurrent email imap backup single python class
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import sys | |
import os | |
import os.path | |
import email | |
import imaplib | |
import threading | |
import time | |
import argparse | |
from itertools import izip_longest | |
class Args: | |
SERVER_NAME = None | |
PORT = 993 | |
LOGIN = None | |
PASSWORD = None | |
INBOX_NAME = 'INBOX' | |
DIRECTORY = None | |
def connect(thread_num, chunk_size=1000): | |
""" | |
Connects to mail | |
1. fetch email ids | |
2. split them to chunks with chunk size | |
3. start threads based on chunks length while chunk are available | |
:param thread_num: | |
:param chunk_size: | |
:return: | |
""" | |
if not os.path.exists(Args.DIRECTORY): | |
os.makedirs(Args.DIRECTORY) | |
mail = connect_mailbox() | |
try: | |
folders = command(mail, 'list') | |
print('Mailboxes : ') | |
print(folders) | |
data = command(mail, 'select', Args.INBOX_NAME) | |
data = command(mail, 'search', 'ALL') | |
email_ids = data[0].split() | |
ids_len = len(email_ids) | |
to_download = grouper(chunk_size, email_ids) | |
# start threads | |
threads = [None]*thread_num | |
running = True | |
print('total : {}'.format(ids_len)) | |
while running: | |
for one in range(0, thread_num): | |
name = 'Thread-{}'.format(one) | |
if not threads[one] or not threads[one].is_alive(): | |
chunk = next(to_download, False) | |
print('-- get range : {}'.format(chunk)) | |
if chunk: | |
t = threading.Thread(target=fetch_range, args=(name, chunk)) | |
t.start() | |
threads[one] = t | |
else: | |
running = False | |
time.sleep(1) | |
finally: | |
print('logout from thread runner') | |
mail.logout() | |
def connect_mailbox(): | |
""" | |
Change this method to modify connection to mailbox | |
:return: | |
""" | |
mail = imaplib.IMAP4_SSL(host=Args.SERVER_NAME, port=Args.PORT) | |
mail.login(Args.LOGIN, Args.PASSWORD) | |
return mail | |
def command(mail, name, data=None): | |
rv = None | |
out = None | |
if name == 'list': | |
rv, out = mail.list() | |
elif name == 'select': | |
rv, out = mail.select(data) | |
elif name == 'search': | |
rv, out = mail.search(None, data) | |
elif name == 'fetch': | |
rv, out = mail.fetch(data, '(RFC822)') | |
if validate_command(rv): | |
return out | |
def validate_command(data): | |
return data == 'OK' | |
def fetch_range(name, data): | |
STDERR = sys.stderr | |
def excepthook(*args): | |
print >> STDERR, 'caught' | |
print >> STDERR, args | |
sys.excepthook = excepthook | |
# can optimize connections | |
mail = connect_mailbox() | |
command(mail, 'select', Args.INBOX_NAME) | |
try : | |
for one in data: | |
# prevent None after group | |
if not one: | |
continue | |
fetch_mail(mail, one) | |
finally: | |
mail.logout() | |
print('finish thread {}'.format(name)) | |
def fetch_mail(mail, id): | |
raw_email = '{}/{}.txt'.format(Args.DIRECTORY, id) | |
if not os.path.exists(raw_email): | |
print('---------------- START {} ----------------'.format(id)) | |
data = command(mail, 'fetch', id) | |
data = data[0][1] | |
print('- FETCHED : {}'.format(len(data))) | |
msg = email.message_from_string(data) | |
with open(raw_email, 'wb') as f: | |
f.write(data) | |
print('---------------- END {} ----------------'.format(id)) | |
def grouper(n, iterable, padvalue=None): | |
args = [iter(iterable)]*n | |
return izip_longest(fillvalue=padvalue, *args) | |
def start(args): | |
print(args) | |
Args.SERVER_NAME = args.server | |
Args.DIRECTORY = args.directory | |
Args.LOGIN = args.login | |
Args.PASSWORD = args.password | |
if args.inbox: | |
Args.INBOX_NAME = args.inbox | |
if args.port: | |
try: | |
Args.PORT = int(args.port) | |
except: | |
print("Invalid port number -port/-o must be integer, using {}".format(Args.PORT)) | |
thread_num = 5 | |
chunk_size = 1000 | |
if args.thread: | |
try: | |
thread_num = int(args.thread) | |
except: | |
print('Invalid thread number -thread/-t must be integer, using {}'.format(thread_num)) | |
if args.chunk: | |
try: | |
chunk_size = int(args.chunk) | |
except: | |
print('Invalid chunk size -chunk/-c must be integer, using {}'.format(chunk_size)) | |
connect(thread_num=thread_num, chunk_size=chunk_size) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
# mandatory parameters | |
parser.add_argument('-server', '-s', help="imap server address ex. imap.gmail.com", required=True) | |
parser.add_argument('-directory', '-d', help="directory to store files", required=True) | |
parser.add_argument('-login', '-l', help="email login", required=True) | |
parser.add_argument('-password', '-p', help="email password", required=True) | |
# optional parameters | |
parser.add_argument('-inbox', '-i', help="inbox name - default INBOX") | |
parser.add_argument('-port', '-o', help="imap port name - default 993") | |
parser.add_argument('-thread', '-t', help="connection number - integer number of threads/connections used to " + | |
"download mail - default 5") | |
parser.add_argument('-chunk', '-c', help="chunk size - integer size of chunks used by each thread to download " | |
"mail ex. -t 5 -c 100 - each thread will download 100 mail each chunk " + | |
"- default 1000") | |
args = parser.parse_args() | |
start(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment