Last active
August 20, 2022 08:01
-
-
Save nathants/53efd551e5eadfdbf5792b3efc396b14 to your computer and use it in GitHub Desktop.
a cli to interact with email via imap and smtp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# The MIT License (MIT) | |
# Copyright (c) 2022-present Nathan Todd-Stone | |
# https://en.wikipedia.org/wiki/MIT_License#License_terms | |
from dateutil import parser | |
from email.header import decode_header | |
from email.mime.text import MIMEText | |
import contextlib | |
import re | |
import datetime | |
import email | |
import os | |
import os | |
import smtplib | |
import sys | |
import shell # pip install git+https://github.com/nathants/py-shell | |
import argh # pip install argh | |
from imbox import Imbox # pip install imbox | |
from imapclient import IMAPClient # pip install imapclient | |
SMTP_FROM = os.environ.get('SMTP_FROM') | |
SMTP_TO = os.environ.get('SMTP_TO') | |
SMTP_SERVER = os.environ['SMTP_SERVER'] | |
SMTP_USER = os.environ['SMTP_USER'] | |
SMTP_PASSWORD = os.environ['SMTP_PASSWORD'] | |
IMAP_SERVER = os.environ['IMAP_SERVER'] | |
IMAP_USER = os.environ['IMAP_USER'] | |
IMAP_PASSWORD = os.environ['IMAP_PASSWORD'] | |
week_ago = (datetime.datetime.now() - datetime.timedelta(weeks=1)).strftime("%Y-%m-%d") | |
@contextlib.contextmanager | |
def server(folder=None): | |
s = IMAPClient(IMAP_SERVER) | |
s.login(IMAP_USER, IMAP_PASSWORD) | |
if folder: | |
s.select_folder(folder) | |
try: | |
yield s | |
except: | |
raise | |
finally: | |
s.logout() | |
def send(subject, frm=SMTP_FROM, to=SMTP_TO): | |
msg = MIMEText(sys.stdin.read()) | |
msg['Subject'] = subject | |
msg['From'] = frm | |
msg['To'] = to | |
s = smtplib.SMTP_SSL(SMTP_SERVER) | |
s.login(SMTP_USER, SMTP_PASSWORD) | |
s.send_message(msg) | |
s.quit() | |
def delete(id): | |
with Imbox(IMAP_SERVER, username=IMAP_USER, password=IMAP_PASSWORD) as imbox: | |
imbox.delete(id) | |
def move(id, folder): | |
with Imbox(IMAP_SERVER, IMAP_USER, IMAP_PASSWORD) as imbox: | |
imbox.move(id, folder) | |
def sizes(folder='ALL', flags=False, since=week_ago): | |
with server(folder) as s: | |
messages = s.search(['SINCE', datetime.datetime.strptime(since, "%Y-%m-%d")] if since else ['ALL']) | |
response = s.fetch(messages, ['FLAGS', 'RFC822.SIZE']) | |
print('id bytes flags', file=sys.stderr) | |
for msgid, data in response.items(): | |
print(msgid, data[b'RFC822.SIZE'], b','.join(data[b'FLAGS']).decode('utf-8') if flags else '') | |
def show(folder, id, long=False): | |
with server(folder) as s: | |
resp = s.search(['UID', id]) | |
[resp] = [(folder, id, data) for id, data in s.fetch(resp, 'RFC822').items()] | |
print_long(message(*resp), truncate=False) | |
def messages(folder='ALL', sender=None, since=week_ago, long=False): | |
with server() as s: | |
resps = [] | |
if folder == 'ALL': | |
folders = [name for flags, delim, name in s.list_folders() if name.lower() not in {'spam', 'junk', 'sent', 'outbox', 'trash'}] | |
else: | |
folders = [folder] | |
criteria = ['SINCE', datetime.datetime.strptime(since, "%Y-%m-%d")] if since else ['ALL'] | |
if sender: | |
criteria += ['FROM', sender] | |
for folder in folders: | |
s.select_folder(folder) | |
resp = s.search(criteria) | |
resps += [(folder, id, data) for id, data in s.fetch(resp, 'RFC822').items()] | |
messages = [message(*args) for args in resps] | |
for msg in sorted(messages, key=lambda x: x['date'], reverse=True): | |
if long: | |
print_long(msg) | |
else: | |
print_short(msg) | |
def print_short(msg): | |
print(' | '.join([ | |
msg['uid'], | |
msg['folder'][:7], | |
msg['date'].strftime("%Y-%m-%dT%H:%M"), | |
msg['from'].split('<')[-1].split('>')[0], | |
msg['subject'][:55], | |
msg['body'][:60].replace('\n', ' '), | |
])) | |
def print_long(msg, truncate=True): | |
print('-' * 80) | |
print('folder:', msg['folder']) | |
print('uid:', msg['uid']) | |
print('date:', msg['date']) | |
print('from:', msg['from']) | |
print('subject:', msg['subject']) | |
print('-' * 80) | |
if truncate: | |
print(re.sub('^', ' ', msg['body'][:300], flags=re.M)) | |
if len(msg['body']) > 300: | |
print(' ...') | |
else: | |
print(re.sub('^', ' ', msg['body'], flags=re.M)) | |
print('-' * 80) | |
print() | |
def message(folder, id, message_data): | |
msg = email.message_from_bytes(message_data[b'RFC822']) | |
payload = msg.get_payload() | |
if isinstance(payload, str): | |
lines = payload.splitlines() | |
lines = [x for x in lines if x.strip()] | |
else: | |
lines = [x | |
for p in payload | |
for x in p.as_string().splitlines() | |
if x.strip() | |
and not re.search(r'^Content\-[^:]+:', x) | |
and not x.strip() == 'charset=us-ascii' | |
and not x.strip() == 'charset=utf-8'] | |
text = '\n'.join(lines) | |
subject = msg['Subject'].replace('\n', 'NEWLINE').replace('\r', '').replace('\t', ' ') | |
return { | |
'folder': folder, | |
'date': parser.parse(msg['Date']), | |
'uid': str(id), | |
'from': msg['From'].replace('\n', 'NEWLINE').replace('\r', ''), | |
'subject': ' '.join(x if isinstance(x, str) else x.decode(y or 'ascii') for x, y in decode_header(subject)), | |
'body': text.replace('\t', ' '), | |
} | |
if __name__ == '__main__': | |
shell.ignore_closed_pipes() | |
argh.dispatch_commands([ | |
messages, | |
move, | |
send, | |
show, | |
sizes, | |
delete, | |
]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment