Skip to content

Instantly share code, notes, and snippets.

@vane
Last active November 9, 2016 23:10
Show Gist options
  • Save vane/b6c47d848a2fc991de12ac7e7274c635 to your computer and use it in GitHub Desktop.
Save vane/b6c47d848a2fc991de12ac7e7274c635 to your computer and use it in GitHub Desktop.
concurrent email imap backup single python class
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import os
import os.path
import email
import imaplib
import threading
import time
import argparse
from itertools import izip_longest
class Args:
SERVER_NAME = None
PORT = 993
LOGIN = None
PASSWORD = None
INBOX_NAME = 'INBOX'
DIRECTORY = None
def connect(thread_num, chunk_size=1000):
"""
Connects to mail
1. fetch email ids
2. split them to chunks with chunk size
3. start threads based on chunks length while chunk are available
:param thread_num:
:param chunk_size:
:return:
"""
if not os.path.exists(Args.DIRECTORY):
os.makedirs(Args.DIRECTORY)
mail = connect_mailbox()
try:
folders = command(mail, 'list')
print('Mailboxes : ')
print(folders)
data = command(mail, 'select', Args.INBOX_NAME)
data = command(mail, 'search', 'ALL')
email_ids = data[0].split()
ids_len = len(email_ids)
to_download = grouper(chunk_size, email_ids)
# start threads
threads = [None]*thread_num
running = True
print('total : {}'.format(ids_len))
while running:
for one in range(0, thread_num):
name = 'Thread-{}'.format(one)
if not threads[one] or not threads[one].is_alive():
chunk = next(to_download, False)
print('-- get range : {}'.format(chunk))
if chunk:
t = threading.Thread(target=fetch_range, args=(name, chunk))
t.start()
threads[one] = t
else:
running = False
time.sleep(1)
finally:
print('logout from thread runner')
mail.logout()
def connect_mailbox():
"""
Change this method to modify connection to mailbox
:return:
"""
mail = imaplib.IMAP4_SSL(host=Args.SERVER_NAME, port=Args.PORT)
mail.login(Args.LOGIN, Args.PASSWORD)
return mail
def command(mail, name, data=None):
rv = None
out = None
if name == 'list':
rv, out = mail.list()
elif name == 'select':
rv, out = mail.select(data)
elif name == 'search':
rv, out = mail.search(None, data)
elif name == 'fetch':
rv, out = mail.fetch(data, '(RFC822)')
if validate_command(rv):
return out
def validate_command(data):
return data == 'OK'
def fetch_range(name, data):
STDERR = sys.stderr
def excepthook(*args):
print >> STDERR, 'caught'
print >> STDERR, args
sys.excepthook = excepthook
# can optimize connections
mail = connect_mailbox()
command(mail, 'select', Args.INBOX_NAME)
try :
for one in data:
# prevent None after group
if not one:
continue
fetch_mail(mail, one)
finally:
mail.logout()
print('finish thread {}'.format(name))
def fetch_mail(mail, id):
raw_email = '{}/{}.txt'.format(Args.DIRECTORY, id)
if not os.path.exists(raw_email):
print('---------------- START {} ----------------'.format(id))
data = command(mail, 'fetch', id)
data = data[0][1]
print('- FETCHED : {}'.format(len(data)))
msg = email.message_from_string(data)
with open(raw_email, 'wb') as f:
f.write(data)
print('---------------- END {} ----------------'.format(id))
def grouper(n, iterable, padvalue=None):
args = [iter(iterable)]*n
return izip_longest(fillvalue=padvalue, *args)
def start(args):
print(args)
Args.SERVER_NAME = args.server
Args.DIRECTORY = args.directory
Args.LOGIN = args.login
Args.PASSWORD = args.password
if args.inbox:
Args.INBOX_NAME = args.inbox
if args.port:
try:
Args.PORT = int(args.port)
except:
print("Invalid port number -port/-o must be integer, using {}".format(Args.PORT))
thread_num = 5
chunk_size = 1000
if args.thread:
try:
thread_num = int(args.thread)
except:
print('Invalid thread number -thread/-t must be integer, using {}'.format(thread_num))
if args.chunk:
try:
chunk_size = int(args.chunk)
except:
print('Invalid chunk size -chunk/-c must be integer, using {}'.format(chunk_size))
connect(thread_num=thread_num, chunk_size=chunk_size)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
# mandatory parameters
parser.add_argument('-server', '-s', help="imap server address ex. imap.gmail.com", required=True)
parser.add_argument('-directory', '-d', help="directory to store files", required=True)
parser.add_argument('-login', '-l', help="email login", required=True)
parser.add_argument('-password', '-p', help="email password", required=True)
# optional parameters
parser.add_argument('-inbox', '-i', help="inbox name - default INBOX")
parser.add_argument('-port', '-o', help="imap port name - default 993")
parser.add_argument('-thread', '-t', help="connection number - integer number of threads/connections used to " +
"download mail - default 5")
parser.add_argument('-chunk', '-c', help="chunk size - integer size of chunks used by each thread to download "
"mail ex. -t 5 -c 100 - each thread will download 100 mail each chunk " +
"- default 1000")
args = parser.parse_args()
start(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment