Skip to content

Instantly share code, notes, and snippets.

@nathants nathants/email.py
Last active Dec 18, 2019

Embed
What would you like to do?
a cli to interact with email via imap and smtp
#!/usr/bin/env python3
from email.header import decode_header
from email.mime.text import MIMEText
import re
import datetime
import email
import os
import os
import smtplib
import sys
from dateutil import parser # pip install python-dateutil
import shell # pip install git+https://github.com/nathants/py-shell
import argh # pip install argh
from imbox import Imbox # pip install imbox
from imapclient import IMAPClient # pip install imapclient
week_ago = (datetime.datetime.now() - datetime.timedelta(weeks=1)).strftime("%Y-%m-%d")
def send(subject,
frm = os.environ.get('SMTP_FROM'),
to = os.environ.get('SMTP_TO')):
""
"""
usage: echo body | email subject --to user@host.com
env vars:
required: SMTP_SERVER, SMTP_USER, SMTP_PASSWORD
optional: SMTP_FROM, SMTP_TO
"""
msg = MIMEText(sys.stdin.read())
msg['Subject'] = subject
msg['From'] = frm
msg['To'] = to
s = smtplib.SMTP_SSL(os.environ['SMTP_SERVER'])
s.login(os.environ['SMTP_USER'], os.environ['SMTP_PASSWORD'])
s.send_message(msg)
s.quit()
def delete(id):
with Imbox(os.environ['IMAP_SERVER'], username=os.environ['IMAP_USER'], password=os.environ['IMAP_PASSWORD']) as imbox:
imbox.delete(id)
def move(id, folder):
with Imbox(os.environ['IMAP_SERVER'], username=os.environ['IMAP_USER'], password=os.environ['IMAP_PASSWORD']) as imbox:
imbox.move(id, folder)
def sizes(folder='INBOX', flags=False, since=week_ago):
HOST = os.environ['IMAP_SERVER']
USERNAME = os.environ['IMAP_USER']
PASSWORD = os.environ['IMAP_PASSWORD']
server = IMAPClient(HOST)
server.login(USERNAME, PASSWORD)
server.select_folder(folder)
messages = server.search(['SINCE', datetime.datetime.strptime(since, "%Y-%m-%d")] if since else ['ALL'])
response = server.fetch(messages, ['FLAGS', 'RFC822.SIZE'])
print('id bytes flags', file=sys.stderr)
for msgid, data in response.items():
print(msgid, data[b'RFC822.SIZE'], b','.join(data[b'FLAGS']).decode('utf-8') if flags else '')
server.logout()
def messages(folder='INBOX', since=week_ago, long=False):
HOST = os.environ['IMAP_SERVER']
USERNAME = os.environ['IMAP_USER']
PASSWORD = os.environ['IMAP_PASSWORD']
server = IMAPClient(HOST)
server.login(USERNAME, PASSWORD)
server.select_folder(folder)
messages = server.search(['SINCE', datetime.datetime.strptime(since, "%Y-%m-%d")] if since else ['ALL'])
for uid, message_data in list(server.fetch(messages, 'RFC822').items())[::-1]:
email_message = email.message_from_bytes(message_data[b'RFC822'])
payload = email_message.get_payload()
if long:
print('=' * 60)
print('id:', uid)
print('date:', email_message.get('Date'))
print('from:', email_message.get('From').replace('\n', ' ').replace('\r', ''))
print('subject:', ' '.join(x if isinstance(x, str) else x.decode(y or 'ascii') for x, y in decode_header(email_message.get('Subject').replace('\n', '\\n'))))
if isinstance(payload, str):
lines = payload.splitlines()
lines = [x for x in lines if x.strip()]
else:
lines = [x
for p in payload
for x in p.as_string().splitlines()
if x.strip()
and not re.search(r'^Content\-[^:]+:', x)
and not x.strip() == 'charset=us-ascii'
and not x.strip() == 'charset=utf-8']
text = '\n'.join(lines)
print('=' * 60)
print(text[:300])
print('=' * 60)
print()
else:
if isinstance(payload, str):
lines = payload.splitlines()
lines = [x for x in lines if x.strip()]
else:
lines = [x
for p in payload
for x in p.as_string().splitlines()
if x.strip()
and not re.search(r'^Content\-[^:]+:', x)
and not x.strip() == 'charset=us-ascii'
and not x.strip() == 'charset=utf-8']
text = '\n'.join(lines)
print(uid,
parser.parse(email_message.get('Date')).strftime("%Y-%m-%dT%H:%M"),
email_message.get('From').split('<')[-1].split('>')[0].ljust(50),
' '.join(x if isinstance(x, str) else x.decode(y or 'ascii') for x, y in decode_header(email_message.get('Subject').replace('\n', '\\n')))[:120])
server.logout()
if __name__ == '__main__':
shell.ignore_closed_pipes()
argh.dispatch_commands([
messages,
move,
send,
sizes,
delete,
])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.