Skip to content

Instantly share code, notes, and snippets.

Last active Feb 18, 2022
What would you like to do?
Instead of forwarding emails, contribute data to Empirasign via API call (interfaces with IMAP protocol--Windows or Linux)
# -*- coding: utf-8 -*-
Stop Forwarding and Start POSTing!
scans imap inbox for emails should be sent to Empirasign servers
and archives them in another folder after sending them accordingly
How it works
- scan all emails in inbox folder
- send these emails to
- move each email to a submissions folder
- rinse and repeat
This script works for both Windows and Linux
import configparser
import datetime
import imaplib
import logging
from logging.handlers import RotatingFileHandler
import os
import platform
import signal
import socket
import sys
import time
import tempfile
import requests
# runtime computed defaults
THIS_DIR = os.path.dirname(__file__) # first place we look for config file
TEMP_DIR = tempfile.gettempdir()
if platform.system() == "Windows":
CACHE_DIR = os.getenv("APPDATA")
CACHE_DIR = os.getenv("HOME") + "/.config"
IMAP = None
def _proxies_dict(proxy_server):
return proxy dictionary as needed by requests library
if this helper function is not comprehensive enough for your use case, consult
if proxy_server:
return {'http': 'http://' + PROXY_SERVER, 'https': 'https://' + PROXY_SERVER}
return {}
def _handle_exit(sig, frame):
gracefully handle a killed process
raise SystemExit
class Config(configparser.ConfigParser):
ConfigParser implementation with validation
final_config = {} # contains config fields with correct data types
def __init__(self, config_file, inline_comment_prefixes=';'):
super().__init__(inline_comment_prefixes=inline_comment_prefixes) # each Section in the config file becomes a class attribute
def validate_config(self):
Assigns any missing fields in the configuration file to a default value
All configuration fields are then coerced to the correct data type
ConfigParser by default bans read values from converting to non-strings,
so all coerced fields are assigned to final_config
config_defaults = {
'imap_settings': {
'imap_server': '',
'source_folder': 'INBOX',
'completed_folder': 'PARSED'
'user_settings': {
'business_hours_start': 7,
'business_hours_end': 17,
'business_hours_interval': 300,
'non_business_hours_interval': 1800
'app_settings': {
'api_url': "",
'cache_dir': CACHE_DIR,
'log_dir': CACHE_DIR,
'debug': False,
'proxy_server': ""
# check for credentials first
if not self['credentials']:
raise ValueError('Missing credentials section in the config file')
if not self['credentials']['api_key']:
raise ValueError('Missing api_key in the config file')
if not self['credentials']['api_secret']:
raise ValueError('Missing api_secret in the config file')
self.final_config['credentials'] = self['credentials']
for section, fields in config_defaults.items():
# assign entirety of a section and its fields if missing from config file
if section not in self:
self.final_config[section] = fields
self.final_config[section] = {}
for field, val in fields.items():
# assign default value if missing from config file
if field not in self[section] or self[section][field] == '':
self.final_config[section][field] = val
# coerce strings to correct data type based on config_defaults
if isinstance(val, str):
self.final_config[section][field] = self[section][field].strip('\'"')
elif isinstance(val, bool):
self.final_config[section][field] = self.getboolean(section, field)
elif isinstance(val, int):
self.final_config[section][field] = self.getint(section, field)
elif isinstance(val, float):
self.final_config[section][field] = self.getfloat(section, field)
# mutate specific fields
cache_dir = self.final_config['app_settings']['cache_dir']
self.final_config['app_settings']['cache_path'] = os.path.join(
cache_dir, 'imap_daq_cache.dat')
log_dir = self.final_config['app_settings']['log_dir']
self.final_config['app_settings']['log_path'] = os.path.join(log_dir, 'imap_daq.log')
# finally add imap email credentials
if not self['imap_settings']['email_account']:
raise ValueError('Missing email account in the config file')
if not self['imap_settings']['password']:
raise ValueError('Missing email password in the config file')
self.final_config['imap_settings']['email_account'] = self['imap_settings']['email_account'].strip('\'"')
self.final_config['imap_settings']['password'] = self['imap_settings']['password'].strip('\'"')
def _safe_create_folder(folder_name):
create folder_name if it does not already exist
all_folders = [x.decode('utf-8').split(' "/" ')[1][1:-1] for x in IMAP.list()[1]]
if folder_name not in all_folders:
IMAP.create(folder_name)"created COMPLETED_FOLDER: %s", folder_name)
else:"folder %s already exists", folder_name)
def transmit_mail(data, uid):
given email data, transmit to API endpoint for Empirasign servers
rfc_bytes = data[0][1]
post_data = {
"api_key": CONF['credentials']['api_key'],
"api_secret": CONF['credentials']['api_secret'],
"rfc": rfc_bytes.decode("ascii")
resp =, json=post_data, timeout=15, proxies=_proxies_dict(PROXY_SERVER))
if resp.status_code == 200:
_ = IMAP.uid('MOVE', uid, COMPLETED_FOLDER) # move completed messages
logger.error("status code: %s, response: %s", resp.status_code, resp.text)
except Exception: # pylint: disable=broad-except
logger.exception("network or IO error")
def get_imap_uids(max_size=2500000):
return list of IMAP uids smaller than the max_size
if max_size:
search_str = "(SMALLER {})".format(int(max_size))
search_str = ""
status, data = IMAP.uid('search', None, search_str)
if status == "OK":
return sorted([x.decode("ascii") for x in data[0].split()])
return []
def main_loop():
scan inbox and deliver every detected email
status, _ =, readonly=False)
if status != "OK":
logger.warning("Invalid folder: '%s'", SOURCE_FOLDER)
raise ValueError
target_lst = get_imap_uids()
if not target_lst:"No new emails detected in folder: %s", SOURCE_FOLDER)
return"%s emails to parse in target folder: %s", len(target_lst), SOURCE_FOLDER)
for imap_uid in target_lst:
typ, data = IMAP.uid('fetch', imap_uid, '(RFC822)')
if typ == 'OK':
transmit_mail(data, imap_uid)
def main():
the main event
global IMAP # pylint: disable=global-statement
# authenticate imap login
IMAP = imaplib.IMAP4_SSL(CONF["imap_settings"]['imap_server'])
except socket.gaierror:
logger.error('Invalid imap server: %s', CONF["imap_settings"]['imap_server'])
IMAP.login(CONF["imap_settings"]['email_account'], CONF["imap_settings"]['password'])
except imaplib.IMAP4.error:
logger.error('Invalid email credentials')
if CONF["imap_settings"]['imap_server'] == '':"For gmail, make sure to also enable IMAP in "\
"account settings and 'less secure apps' in gmail settings "\
else:"Some services may require certain settings to "\
"be enabled for third-party apps")
return'sucessfully logged in to email')
active_start_time = datetime.time(CONF["user_settings"]['business_hours_start'], 0, 0)
active_end_time = datetime.time(CONF["user_settings"]['business_hours_end'], 0, 0)
while True:
curr_time =
if active_start_time <= curr_time <= active_end_time:
loop_time = CONF["user_settings"]['business_hours_interval']
loop_time = CONF["user_settings"]['non_business_hours_interval']
loop_time = 10
if __name__ == "__main__":
CONF_PATH = os.path.join(THIS_DIR, 'email_daq.ini')
if not os.path.exists(CONF_PATH):
CONF_PATH = os.path.join(CACHE_DIR, 'email_daq.ini')
CONF = Config(CONF_PATH, inline_comment_prefixes=";").final_config
# do this after loading config
DEBUG = CONF["app_settings"]["debug"]
URL = CONF["app_settings"]["api_url"]
PROXY_SERVER = CONF["app_settings"]["proxy_server"]
SOURCE_FOLDER = CONF["imap_settings"]["source_folder"]
COMPLETED_FOLDER = CONF["imap_settings"]["completed_folder"]
rfh = RotatingFileHandler(
maxBytes=5 * 1024 * 1024, # 5MB
format="%(asctime)s %(name)-25s %(levelname)-8s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[rfh, logging.StreamHandler(sys.stdout)])
logger = logging.getLogger()
signal.signal(signal.SIGTERM, _handle_exit)
except (KeyboardInterrupt, SystemExit):
logger.warning("user hit CTRL-C or killed process, now exiting")
if IMAP:
except Exception as e: # pylint: disable=broad-except
logger.exception('untrapped exception: %s' % e)'Please email a copy of this log file "%s" to',
if IMAP:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment