Skip to content

Instantly share code, notes, and snippets.

@nathants nathants/email.py

Last active Jun 5, 2020
Embed
What would you like to do?
a cli to interact with email via imap and smtp
#!/usr/bin/env python3
from dateutil import parser
from email.header import decode_header
from email.mime.text import MIMEText
import re
import datetime
import email
import os
import os
import smtplib
import sys
import shell # pip install git+https://github.com/nathants/py-shell
import argh # pip install argh
from imbox import Imbox # pip install imbox
from imapclient import IMAPClient # pip install imapclient
SMTP_FROM = os.environ.get('SMTP_FROM')
SMTP_TO = os.environ.get('SMTP_TO')
SMTP_SERVER = os.environ['SMTP_SERVER']
SMTP_USER = os.environ['SMTP_USER']
SMTP_PASSWORD = os.environ['SMTP_PASSWORD']
IMAP_SERVER = os.environ['IMAP_SERVER']
IMAP_USER = os.environ['IMAP_USER']
IMAP_PASSWORD = os.environ['IMAP_PASSWORD']
week_ago = (datetime.datetime.now() - datetime.timedelta(weeks=1)).strftime("%Y-%m-%d")
def send(subject, frm=SMTP_FROM, to=SMTP_TO):
msg = MIMEText(sys.stdin.read())
msg['Subject'] = subject
msg['From'] = frm
msg['To'] = to
s = smtplib.SMTP_SSL(SMTP_SERVER)
s.login(SMTP_USER, SMTP_PASSWORD)
s.send_message(msg)
s.quit()
def delete(id):
with Imbox(IMAP_SERVER, username=IMAP_USER, password=IMAP_PASSWORD) as imbox:
imbox.delete(id)
def move(id, folder):
with Imbox(IMAP_SERVER, IMAP_USER, IMAP_PASSWORD) as imbox:
imbox.move(id, folder)
def sizes(folder='ALL', flags=False, since=week_ago):
server = IMAPClient(IMAP_SERVER)
server.login(IMAP_USER, IMAP_PASSWORD)
server.select_folder(folder)
messages = server.search(['SINCE', datetime.datetime.strptime(since, "%Y-%m-%d")] if since else ['ALL'])
response = server.fetch(messages, ['FLAGS', 'RFC822.SIZE'])
print('id bytes flags', file=sys.stderr)
for msgid, data in response.items():
print(msgid, data[b'RFC822.SIZE'], b','.join(data[b'FLAGS']).decode('utf-8') if flags else '')
server.logout()
def show(folder, id, long=False):
server = IMAPClient(IMAP_SERVER)
server.login(IMAP_USER, IMAP_PASSWORD)
server.select_folder(folder)
resp = server.search(['UID', id])
[resp] = [(folder, id, data) for id, data in server.fetch(resp, 'RFC822').items()]
print_long(message(*resp))
server.logout()
def messages(folder='ALL', since=week_ago, long=False):
server = IMAPClient(IMAP_SERVER)
server.login(IMAP_USER, IMAP_PASSWORD)
resps = []
if folder == 'ALL':
folders = [name for flags, delim, name in server.list_folders() if name.lower() not in {'spam', 'junk', 'sent', 'outbox', 'trash'}]
else:
folders = [folder]
for folder in folders:
server.select_folder(folder)
resp = server.search(['SINCE', datetime.datetime.strptime(since, "%Y-%m-%d")] if since else ['ALL'])
resps += [(folder, id, data) for id, data in server.fetch(resp, 'RFC822').items()]
messages = [message(*args) for args in resps]
for msg in sorted(messages, key=lambda x: x['date'], reverse=True):
if long:
print_long(msg)
else:
print_short(msg)
server.logout()
def print_short(msg):
print(' | '.join([
msg['uid'].ljust(7)[:7],
msg['folder'].ljust(7)[:7],
msg['date'].strftime("%Y-%m-%dT%H:%M"),
msg['from'].split('<')[-1].split('>')[0].ljust(30)[:30],
msg['subject'].ljust(55)[:55],
msg['body'][:60].replace('\n', ' '),
]))
def print_long(msg):
print('=' * 60)
print(msg['uid'].ljust(10))
print('date:', msg['date'])
print('from:', msg['from'])
print('subject:', msg['subject'])
print('=' * 60)
print(msg['body'][:300])
if len(msg['body']) > 300:
print('...')
print('=' * 60)
print()
def message(folder, id, message_data):
msg = email.message_from_bytes(message_data[b'RFC822'])
payload = msg.get_payload()
if isinstance(payload, str):
lines = payload.splitlines()
lines = [x for x in lines if x.strip()]
else:
lines = [x
for p in payload
for x in p.as_string().splitlines()
if x.strip()
and not re.search(r'^Content\-[^:]+:', x)
and not x.strip() == 'charset=us-ascii'
and not x.strip() == 'charset=utf-8']
text = '\n'.join(lines)
subject = msg['Subject'].replace('\n', 'NEWLINE').replace('\r', '').replace('\t', ' ')
return {
'folder': folder,
'date': parser.parse(msg['Date']),
'uid': str(id),
'from': msg['From'].replace('\n', 'NEWLINE').replace('\r', ''),
'subject': ' '.join(x if isinstance(x, str) else x.decode(y or 'ascii') for x, y in decode_header(subject)),
'body': text.replace('\t', ' '),
}
if __name__ == '__main__':
shell.ignore_closed_pipes()
argh.dispatch_commands([
messages,
move,
send,
show,
sizes,
delete,
])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.