Skip to content

Instantly share code, notes, and snippets.

@pjanzen
Created January 25, 2019 20:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pjanzen/965fb0fd91521ee2e927c808e8dfc0e7 to your computer and use it in GitHub Desktop.
Save pjanzen/965fb0fd91521ee2e927c808e8dfc0e7 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys
import logging
import imaplib
import email
import email.header
from datetime import datetime, timedelta
import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers
logger = logging.getLogger('pollImap')
logger.setLevel(logging.INFO)
fh = logging.FileHandler('/var/log/pollimap.log')
fh.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
logger.addHandler(fh)
class App:
def __init__(self):
self.es = Elasticsearch(timeout=30, hosts=['10.80.3.7', '10.80.3.6', '10.80.3.5', '10.80.3.4'], http_auth=('elastic', 'xxxxx'))
self.m = imaplib
def connect(self):
self.m = imaplib.IMAP4_PORT = 143
self.m = imaplib.IMAP4('imap server')
self.m.login('login', 'username')
self.m.select('INBOX', readonly=False)
def disconnect(self):
self.m.close()
self.m.logout()
def cleanup_mailbox(self):
e_res = self.m.expunge()
logger.info("Expunging {}".format(e_res))
return e_res
def process(self):
resp, items = self.m.search(None, 'ALL')
items = items[0].split()
es_data = []
mail_ids = []
utc_index = datetime.utcnow() - timedelta(days=0)
utc_index = utc_index.strftime("%Y.%m.%d")
index_name = 'complaint_data-{}'.format(utc_index)
for emailid in items:
print "Fetching mailID: {}".format(emailid)
try:
resp, data = self.m.fetch(emailid, '(RFC822)')
mail = email.message_from_string(data[0][1])
res, data = self.m.store(emailid, '+FLAGS', '\Deleted')
print "Mark {} deleted: {}".format(emailid, res)
if mail.get_content_maintype() != 'multipart':
continue
for part in mail.walk():
if part.get('Content-Disposition') == 'inline':
for line in part.get_payload():
actions = {
"complaint_source": mail['From'],
"timestamp": datetime.now(),
"complaint_date": mail['Date']
}
sourceIP = None
date_send = None
authSender = None
subject = None
try:
for key in line.keys():
if key == 'X-SourceIP':
sourceIP = line.get(key)
actions[key] = line.get(key)
elif key == 'Date':
date_send = line.get(key)
actions['send_date'] = line.get(key)
elif key == 'X-Authenticated-Sender':
tt = line.get(key).split(' ')
authSender = tt[0]
actions[key] = tt[0]
elif key == 'Subject':
if '=?utf-8' in line.get(key) or '=?UTF-8' in line.get(key) or '=?utf8' in line.get(key):
tt = email.header.decode_header(line.get(key))
if isinstance(tt, (list, tuple)) and not isinstance(tt, basestring):
subject = tt[0][0]
else:
subject = line.get(key)
actions[key] = subject
else:
continue
except AttributeError:
continue
self.es.index(index=index_name, doc_type='complaints', body=actions, timestamp=datetime.now())
except Exception, e:
logger.info("Could not fetch message: {}, exiting...".format(emailid))
return False
return True
if __name__ == '__main__':
do = App()
do.connect()
res = False
while res is False:
res = do.process()
if res:
logger.info("We processed all messages, exiting...")
do.cleanup_mailbox()
do.disconnect()
sys.exit(0)
else:
logger.info("Closing INBOX and logout of server...")
do.disconnect()
logger.info("Sleeping for 5 sec before continueing..")
time.sleep(5)
do.connect()
res = do.process()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment