Skip to content

Instantly share code, notes, and snippets.

Last active Feb 24, 2022
What would you like to do?
demo script to parse runs in IMAP Inbox
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
demo script to parse runs in IMAP Inbox
connect to IMAP server
locate new messages in INBOX / SOURCE_FOLDER
for each new message
send message to parser
save parsed results JSON blob in temp directory
move parsed messages to COMPLETED_FOLDER
close IMAP server connection
Things this script does not do
- it's not terribly efficient with IMAP network traffic (no chunking of requests)
- it does not save results to a local database, only to local JSON files
- other than timestamps and SOURCE_FOLDER / COMPLETED_FOLDER message placements,
it has no real sense of what needs to be parsed and what has already been parsed
import datetime
import email
import imaplib
import json
import logging
import logging.handlers
from pathlib import Path
import sys
import tempfile
import time
from urllib.parse import quote
import requests
# Configuration Constants
API_KEY = "MY_API_KEY" # provided by Empirasign
API_SECRET = "MY_API_SECRET" # provided by Empirasign
API_URL = '' # Corporate runs parser
# for other parsing endpoints, see docs:
# If you need to locate your proxy server, the easiest way to do this is by
# using our tool on the "proxy_finder" tab the of demo api spreadsheet,
# Click the "Find Proxy" button, results will show up in cell B4
# demo api spreadsheet can be found at:
# click on Excel icon to download
IMAP_HOST = "" # or whomever your email backend provider is
# this script maintains state via folder usage
# once an email is moved into COMPLETED_FOLDER, no further parsing attempts
# will be made
RESULTS_DIR = Path(tempfile.gettempdir())
rfh = logging.handlers.RotatingFileHandler(filename=tempfile.gettempdir() + "/parse_imap_inbox.log",
format="%(asctime)s - %(levelname)-8s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[rfh, logging.StreamHandler(sys.stdout)])
logger = logging.getLogger()
def _proxies_dict(proxy_server):
return proxy dictionary as needed by requests library
if this helper function is not comprehensive enough for your use case, consult
if proxy_server:
return {'http': 'http://' + PROXY_SERVER, 'https': 'https://' + PROXY_SERVER}
return {}
def get_imap_uids(imap, d0, d1=None, max_size=2500000):
return list of IMAP uids for a date or date range, label and max_size
if not d1:
d1 = + datetime.timedelta(1)
if d1 < d0:
d0, d1 = d1, d0
d1 += datetime.timedelta(1) # this makes the search inlusive?
date_str = 'SENTSINCE "{}" SENTBEFORE "{}"'.format(d0.strftime("%d-%b-%Y"),
if max_size:
size_str = "SMALLER {}".format(int(max_size))
size_str = ""
search_items = [x for x in [size_str, date_str] if x]
search_str = "(" + " ".join(search_items) + ")""IMAP search string: %s", search_str)
status, data = imap.uid('search', None, search_str)
if status == "OK":
return sorted([x.decode("ascii") for x in data[0].split()])
return []
def _safe_create_folder(imap, folder_name):
create folder_name if it does not already exist
all_folders = [x.decode('utf-8').split(' "/" ')[1][1:-1] for x in imap.list()[1]]
if folder_name not in all_folders:
imap.create(folder_name)"created COMPLETED_FOLDER: %s", folder_name)
else:"folder %s already exists", folder_name)
def main():
the main event
t0 = time.time()" starting")
imap = imaplib.IMAP4_SSL(IMAP_HOST, 993)
except Exception:
# if you get the following error
# error: b'[AUTHENTICATIONFAILED] Invalid credentials (Failure)'
# for Gmail / Google Workspace must enable "Less Secure app access" at the following URL
# other services may have other settings to enable
# For example, Fastmail calls them App passwords
logger.exception("try enabling 'Less Secure app access' at") # pylint: disable=line-too-long
status, num_raw =, readonly=False) # pylint: disable=unused-variable"target folder: %s, total messages (no filter): %s", SOURCE_FOLDER, num_raw)
# target_lst = get_imap_uids(imap,
target_lst = get_imap_uids(imap,, 1, 1))"%s emails to parse in target folder: %s", len(target_lst), SOURCE_FOLDER)
if target_lst:
_safe_create_folder(imap, COMPLETED_FOLDER)
done_lst = []
for imap_uid in target_lst:
typ, data = imap.uid('fetch', imap_uid, '(RFC822)')
if typ == "OK":
# we can send email to parser
rfc_bytes = data[0][1]
post_data = {
"api_key": API_KEY,
"api_secret": API_SECRET,
"rfc": rfc_bytes.decode("ascii")
resp =, json=post_data, proxies=_proxies_dict(PROXY_SERVER),
if resp.status_code == 200:
obj = resp.json()
# compute a filename that's safe on Windows and Linux
message_id = email.message_from_bytes(rfc_bytes)["Message-ID"]
fname = quote(message_id, safe="") + ".json"
# reversible via unquote(fname)[:-5]
with open(RESULTS_DIR / fname, "wt") as fp:
fp.write(json.dumps(obj, sort_keys=True, indent=2, ensure_ascii=False))"saved parsed results imap_uid: %s, Message-ID: %s filename: %s",
imap_uid, message_id, fname)
# check remaininq quota
if obj["meta"]["api_req_left"] < 1:
logger.warning("early exit of loop, daily quota exhausted, processed %s / %s",
len(done_lst), len(target_lst))
with open(RESULTS_DIR / (imap_uid + "-error.json"), "wt") as fp:
logger.error("error occurred, http status code: %s, error file: %s",
resp.status_code, imap_uid + ".json")
if done_lst:
result = imap.uid('MOVE', ",".join(done_lst), COMPLETED_FOLDER) # move completed messages
imap.logout()"finished, total run time secs: {:,}".format(round(time.time() - t0)))
if __name__ == "__main__":
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment