Skip to content

Instantly share code, notes, and snippets.

@empirasign
Last active Feb 18, 2022
Embed
What would you like to do?
Instead of forwarding emails, contribute data to Empirasign via API call (interfaces with IMAP protocol--Windows or Linux)
# -*- coding: utf-8 -*-
"""
imap_daq.py
https://gist.github.com/empirasign/6f3feab4730c94be747723f0b56f6502
Stop Forwarding and Start POSTing!
scans imap inbox for emails should be sent to Empirasign servers
and archives them in another folder after sending them accordingly
How it works
- scan all emails in inbox folder
- send these emails to api.empirasign.com/submit-email/
- move each email to a submissions folder
- rinse and repeat
Requirements
Python3
https://www.python.org/downloads/
requests
https://docs.python-requests.org/en/latest/
This script works for both Windows and Linux
"""
import configparser
import datetime
import imaplib
import logging
from logging.handlers import RotatingFileHandler
import os
import platform
import signal
import socket
import sys
import time
import tempfile
import requests
# runtime computed defaults
THIS_DIR = os.path.dirname(__file__) # first place we look for config file
TEMP_DIR = tempfile.gettempdir()
if platform.system() == "Windows":
CACHE_DIR = os.getenv("APPDATA")
else:
CACHE_DIR = os.getenv("HOME") + "/.config"
IMAP = None
def _proxies_dict(proxy_server):
"""
return proxy dictionary as needed by requests library
if this helper function is not comprehensive enough for your use case, consult
http://docs.python-requests.org/en/latest/user/advanced/#proxies
"""
if proxy_server:
return {'http': 'http://' + PROXY_SERVER, 'https': 'https://' + PROXY_SERVER}
return {}
def _handle_exit(sig, frame):
"""
gracefully handle a killed process
https://stackoverflow.com/a/54578211
"""
raise SystemExit
class Config(configparser.ConfigParser):
"""
ConfigParser implementation with validation
"""
final_config = {} # contains config fields with correct data types
def __init__(self, config_file, inline_comment_prefixes=';'):
super().__init__(inline_comment_prefixes=inline_comment_prefixes)
self.read(config_file) # each Section in the config file becomes a class attribute
self.validate_config()
def validate_config(self):
"""
Assigns any missing fields in the configuration file to a default value
All configuration fields are then coerced to the correct data type
ConfigParser by default bans read values from converting to non-strings,
so all coerced fields are assigned to final_config
"""
config_defaults = {
'imap_settings': {
'imap_server': 'imap.gmail.com',
'source_folder': 'INBOX',
'completed_folder': 'PARSED'
},
'user_settings': {
'business_hours_start': 7,
'business_hours_end': 17,
'business_hours_interval': 300,
'non_business_hours_interval': 1800
},
'app_settings': {
'api_url': "https://api.empirasign.com/submit-email/",
'cache_dir': CACHE_DIR,
'log_dir': CACHE_DIR,
'debug': False,
'proxy_server': ""
}}
# check for credentials first
if not self['credentials']:
raise ValueError('Missing credentials section in the config file')
if not self['credentials']['api_key']:
raise ValueError('Missing api_key in the config file')
if not self['credentials']['api_secret']:
raise ValueError('Missing api_secret in the config file')
self.final_config['credentials'] = self['credentials']
for section, fields in config_defaults.items():
# assign entirety of a section and its fields if missing from config file
if section not in self:
self.final_config[section] = fields
continue
self.final_config[section] = {}
for field, val in fields.items():
# assign default value if missing from config file
if field not in self[section] or self[section][field] == '':
self.final_config[section][field] = val
continue
# coerce strings to correct data type based on config_defaults
if isinstance(val, str):
self.final_config[section][field] = self[section][field].strip('\'"')
elif isinstance(val, bool):
self.final_config[section][field] = self.getboolean(section, field)
elif isinstance(val, int):
self.final_config[section][field] = self.getint(section, field)
elif isinstance(val, float):
self.final_config[section][field] = self.getfloat(section, field)
# mutate specific fields
cache_dir = self.final_config['app_settings']['cache_dir']
self.final_config['app_settings']['cache_path'] = os.path.join(
cache_dir, 'imap_daq_cache.dat')
log_dir = self.final_config['app_settings']['log_dir']
self.final_config['app_settings']['log_path'] = os.path.join(log_dir, 'imap_daq.log')
# finally add imap email credentials
if not self['imap_settings']['email_account']:
raise ValueError('Missing email account in the config file')
if not self['imap_settings']['password']:
raise ValueError('Missing email password in the config file')
self.final_config['imap_settings']['email_account'] = self['imap_settings']['email_account'].strip('\'"')
self.final_config['imap_settings']['password'] = self['imap_settings']['password'].strip('\'"')
def _safe_create_folder(folder_name):
"""
create folder_name if it does not already exist
"""
all_folders = [x.decode('utf-8').split(' "/" ')[1][1:-1] for x in IMAP.list()[1]]
if folder_name not in all_folders:
IMAP.create(folder_name)
logger.info("created COMPLETED_FOLDER: %s", folder_name)
else:
logger.info("folder %s already exists", folder_name)
def transmit_mail(data, uid):
"""
given email data, transmit to API endpoint for Empirasign servers
"""
rfc_bytes = data[0][1]
post_data = {
"api_key": CONF['credentials']['api_key'],
"api_secret": CONF['credentials']['api_secret'],
"rfc": rfc_bytes.decode("ascii")
}
try:
resp = requests.post(URL, json=post_data, timeout=15, proxies=_proxies_dict(PROXY_SERVER))
if resp.status_code == 200:
_ = IMAP.uid('MOVE', uid, COMPLETED_FOLDER) # move completed messages
else:
logger.error("status code: %s, response: %s", resp.status_code, resp.text)
except Exception: # pylint: disable=broad-except
logger.exception("network or IO error")
def get_imap_uids(max_size=2500000):
"""
return list of IMAP uids smaller than the max_size
"""
if max_size:
search_str = "(SMALLER {})".format(int(max_size))
else:
search_str = ""
status, data = IMAP.uid('search', None, search_str)
if status == "OK":
return sorted([x.decode("ascii") for x in data[0].split()])
return []
def main_loop():
"""
scan inbox and deliver every detected email
"""
status, _ = IMAP.select(SOURCE_FOLDER, readonly=False)
if status != "OK":
logger.warning("Invalid folder: '%s'", SOURCE_FOLDER)
raise ValueError
target_lst = get_imap_uids()
if not target_lst:
logger.info("No new emails detected in folder: %s", SOURCE_FOLDER)
return
logger.info("%s emails to parse in target folder: %s", len(target_lst), SOURCE_FOLDER)
_safe_create_folder(COMPLETED_FOLDER)
for imap_uid in target_lst:
typ, data = IMAP.uid('fetch', imap_uid, '(RFC822)')
if typ == 'OK':
transmit_mail(data, imap_uid)
def main():
"""
the main event
"""
global IMAP # pylint: disable=global-statement
# authenticate imap login
try:
IMAP = imaplib.IMAP4_SSL(CONF["imap_settings"]['imap_server'])
except socket.gaierror:
logger.error('Invalid imap server: %s', CONF["imap_settings"]['imap_server'])
return
try:
IMAP.login(CONF["imap_settings"]['email_account'], CONF["imap_settings"]['password'])
except imaplib.IMAP4.error:
logger.error('Invalid email credentials')
if CONF["imap_settings"]['imap_server'] == 'imap.gmail.com':
logger.info("For gmail, make sure to also enable IMAP in "\
"account settings and 'less secure apps' in gmail settings "\
"(https://myaccount.google.com/lesssecureapps)")
else:
logger.info("Some services may require certain settings to "\
"be enabled for third-party apps")
return
logger.info('sucessfully logged in to email')
active_start_time = datetime.time(CONF["user_settings"]['business_hours_start'], 0, 0)
active_end_time = datetime.time(CONF["user_settings"]['business_hours_end'], 0, 0)
while True:
main_loop()
curr_time = datetime.datetime.now().time()
if active_start_time <= curr_time <= active_end_time:
loop_time = CONF["user_settings"]['business_hours_interval']
else:
loop_time = CONF["user_settings"]['non_business_hours_interval']
if DEBUG:
loop_time = 10
time.sleep(loop_time)
if __name__ == "__main__":
CONF_PATH = os.path.join(THIS_DIR, 'email_daq.ini')
if not os.path.exists(CONF_PATH):
CONF_PATH = os.path.join(CACHE_DIR, 'email_daq.ini')
CONF = Config(CONF_PATH, inline_comment_prefixes=";").final_config
# do this after loading config
DEBUG = CONF["app_settings"]["debug"]
URL = CONF["app_settings"]["api_url"]
PROXY_SERVER = CONF["app_settings"]["proxy_server"]
# https://requestbin.com/
SOURCE_FOLDER = CONF["imap_settings"]["source_folder"]
COMPLETED_FOLDER = CONF["imap_settings"]["completed_folder"]
rfh = RotatingFileHandler(
filename=CONF["app_settings"]["log_path"],
maxBytes=5 * 1024 * 1024, # 5MB
)
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(name)-25s %(levelname)-8s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[rfh, logging.StreamHandler(sys.stdout)])
logger = logging.getLogger()
signal.signal(signal.SIGTERM, _handle_exit)
try:
main()
except (KeyboardInterrupt, SystemExit):
logger.warning("user hit CTRL-C or killed process, now exiting")
if IMAP:
IMAP.logout()
except Exception as e: # pylint: disable=broad-except
logger.exception('untrapped exception: %s' % e)
logger.info('Please email a copy of this log file "%s" to info@empirasign.com',
CONF["app_settings"]["log_path"])
if IMAP:
IMAP.logout()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment