Skip to content

Instantly share code, notes, and snippets.

@nathants
Last active August 20, 2022 08:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nathants/53efd551e5eadfdbf5792b3efc396b14 to your computer and use it in GitHub Desktop.
Save nathants/53efd551e5eadfdbf5792b3efc396b14 to your computer and use it in GitHub Desktop.
a cli to interact with email via imap and smtp
#!/usr/bin/env python3
# The MIT License (MIT)
# Copyright (c) 2022-present Nathan Todd-Stone
# https://en.wikipedia.org/wiki/MIT_License#License_terms
from dateutil import parser
from email.header import decode_header
from email.mime.text import MIMEText
import contextlib
import re
import datetime
import email
import os
import os
import smtplib
import sys
import shell # pip install git+https://github.com/nathants/py-shell
import argh # pip install argh
from imbox import Imbox # pip install imbox
from imapclient import IMAPClient # pip install imapclient
SMTP_FROM = os.environ.get('SMTP_FROM')
SMTP_TO = os.environ.get('SMTP_TO')
SMTP_SERVER = os.environ['SMTP_SERVER']
SMTP_USER = os.environ['SMTP_USER']
SMTP_PASSWORD = os.environ['SMTP_PASSWORD']
IMAP_SERVER = os.environ['IMAP_SERVER']
IMAP_USER = os.environ['IMAP_USER']
IMAP_PASSWORD = os.environ['IMAP_PASSWORD']
week_ago = (datetime.datetime.now() - datetime.timedelta(weeks=1)).strftime("%Y-%m-%d")
@contextlib.contextmanager
def server(folder=None):
s = IMAPClient(IMAP_SERVER)
s.login(IMAP_USER, IMAP_PASSWORD)
if folder:
s.select_folder(folder)
try:
yield s
except:
raise
finally:
s.logout()
def send(subject, frm=SMTP_FROM, to=SMTP_TO):
msg = MIMEText(sys.stdin.read())
msg['Subject'] = subject
msg['From'] = frm
msg['To'] = to
s = smtplib.SMTP_SSL(SMTP_SERVER)
s.login(SMTP_USER, SMTP_PASSWORD)
s.send_message(msg)
s.quit()
def delete(id):
with Imbox(IMAP_SERVER, username=IMAP_USER, password=IMAP_PASSWORD) as imbox:
imbox.delete(id)
def move(id, folder):
with Imbox(IMAP_SERVER, IMAP_USER, IMAP_PASSWORD) as imbox:
imbox.move(id, folder)
def sizes(folder='ALL', flags=False, since=week_ago):
with server(folder) as s:
messages = s.search(['SINCE', datetime.datetime.strptime(since, "%Y-%m-%d")] if since else ['ALL'])
response = s.fetch(messages, ['FLAGS', 'RFC822.SIZE'])
print('id bytes flags', file=sys.stderr)
for msgid, data in response.items():
print(msgid, data[b'RFC822.SIZE'], b','.join(data[b'FLAGS']).decode('utf-8') if flags else '')
def show(folder, id, long=False):
with server(folder) as s:
resp = s.search(['UID', id])
[resp] = [(folder, id, data) for id, data in s.fetch(resp, 'RFC822').items()]
print_long(message(*resp), truncate=False)
def messages(folder='ALL', sender=None, since=week_ago, long=False):
with server() as s:
resps = []
if folder == 'ALL':
folders = [name for flags, delim, name in s.list_folders() if name.lower() not in {'spam', 'junk', 'sent', 'outbox', 'trash'}]
else:
folders = [folder]
criteria = ['SINCE', datetime.datetime.strptime(since, "%Y-%m-%d")] if since else ['ALL']
if sender:
criteria += ['FROM', sender]
for folder in folders:
s.select_folder(folder)
resp = s.search(criteria)
resps += [(folder, id, data) for id, data in s.fetch(resp, 'RFC822').items()]
messages = [message(*args) for args in resps]
for msg in sorted(messages, key=lambda x: x['date'], reverse=True):
if long:
print_long(msg)
else:
print_short(msg)
def print_short(msg):
print(' | '.join([
msg['uid'],
msg['folder'][:7],
msg['date'].strftime("%Y-%m-%dT%H:%M"),
msg['from'].split('<')[-1].split('>')[0],
msg['subject'][:55],
msg['body'][:60].replace('\n', ' '),
]))
def print_long(msg, truncate=True):
print('-' * 80)
print('folder:', msg['folder'])
print('uid:', msg['uid'])
print('date:', msg['date'])
print('from:', msg['from'])
print('subject:', msg['subject'])
print('-' * 80)
if truncate:
print(re.sub('^', ' ', msg['body'][:300], flags=re.M))
if len(msg['body']) > 300:
print(' ...')
else:
print(re.sub('^', ' ', msg['body'], flags=re.M))
print('-' * 80)
print()
def message(folder, id, message_data):
msg = email.message_from_bytes(message_data[b'RFC822'])
payload = msg.get_payload()
if isinstance(payload, str):
lines = payload.splitlines()
lines = [x for x in lines if x.strip()]
else:
lines = [x
for p in payload
for x in p.as_string().splitlines()
if x.strip()
and not re.search(r'^Content\-[^:]+:', x)
and not x.strip() == 'charset=us-ascii'
and not x.strip() == 'charset=utf-8']
text = '\n'.join(lines)
subject = msg['Subject'].replace('\n', 'NEWLINE').replace('\r', '').replace('\t', ' ')
return {
'folder': folder,
'date': parser.parse(msg['Date']),
'uid': str(id),
'from': msg['From'].replace('\n', 'NEWLINE').replace('\r', ''),
'subject': ' '.join(x if isinstance(x, str) else x.decode(y or 'ascii') for x, y in decode_header(subject)),
'body': text.replace('\t', ' '),
}
if __name__ == '__main__':
shell.ignore_closed_pipes()
argh.dispatch_commands([
messages,
move,
send,
show,
sizes,
delete,
])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment