Skip to content

Instantly share code, notes, and snippets.

@pew
Last active July 10, 2021 14:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pew/fcf8aa4d15329cf7ab3365ab6563a8c6 to your computer and use it in GitHub Desktop.
Save pew/fcf8aa4d15329cf7ab3365ab6563a8c6 to your computer and use it in GitHub Desktop.
e-mail to discord webhook
# 1. create receiving ses account
# 2. create action to save email to s3
# 4. configure lambda function to use environment variables defined below (HOOK_DOMAIN, HOOK_PATH and BUCKEt_NAME)
# 5. allow lambda to read from s3 bucket
# 6. deployment with beautifulsoup:
# pip install beautifulsoup4
# cd $HOME/.pyenv/versions/email2discord/lib/python3.9/site-packages
# zip -r9 ${OLDPWD}/function.zip .
# cd ${OLDPWD}
# zip -g function.zip app.py
# aws lambda --region us-east-1 update-function-code --function-name yourFunctionName --zip-file fileb://function.zip
import http.client
import json
import os
import tempfile
from email.parser import Parser
from io import BytesIO
import boto3
import urllib3
from bs4 import BeautifulSoup
session = boto3.Session()
s3_client = session.client("s3")
http_client = urllib3.PoolManager()
parser = Parser()
HOOK_DOMAIN = os.environ["HOOK_DOMAIN"] # discord.com
HOOK_PATH = os.environ["HOOK_PATH"] # /api/asdf/asdf
BUCKET_NAME = os.environ["BUCKET_NAME"] # support.example.com
def html2text(body):
soup = BeautifulSoup(body, "html.parser")
text = soup.get_text()
return text
def parse_message(message_id):
"""read/parse e-mail from s3"""
f = BytesIO()
attachments = []
s3_client.download_fileobj(BUCKET_NAME, message_id, f)
message_content = f.getvalue()
incoming = Parser().parsestr(message_content.decode("utf-8"))
if incoming.is_multipart():
for part in incoming.walk():
ctype = part.get_content_type()
cdispo = str(part.get("Content-Disposition"))
if ("attachment" or "inline" in cdispo) and (
ctype.startswith("image/") or ctype.startswith("video/")
):
attachments.append(
{
"name": part.get_filename(),
"payload": part.get_payload(decode=True),
}
)
if ctype == "text/plain":
# and "attachment" in cdispo or "inline" in cdispo:
body = part.get_payload(decode=True) # decode
if ctype == "text/html":
body = html2text(part.get_payload(decode=True))
elif incoming.get_content_type() == "text/html":
body = html2text(incoming.get_payload(decode=True))
else:
body = incoming.get_payload(decode=True)
return incoming, body, attachments
def longform(title, body):
fp = tempfile.NamedTemporaryFile()
fp.write(body)
fp.seek(0)
r = http_client.request(
"POST",
"https://{}/{}".format(HOOK_DOMAIN, HOOK_PATH),
fields={
"filefield": ("{}.txt".format(title), fp.read(), "text/plain"),
},
)
fp.close()
return r.status
def send_attachment(attachments):
for attachment in attachments:
fp = tempfile.NamedTemporaryFile()
fp.write(attachment["payload"])
fp.seek(0)
r = http_client.request(
"POST",
"https://{}/{}".format(HOOK_DOMAIN, HOOK_PATH),
fields={
"filefield": (attachment["name"], fp.read()),
},
)
fp.close()
return r.status
def lambda_handler(event, context):
# get message id from S3 trigger
message_id = event["Records"][0]["s3"]["object"]["key"]
incoming, body, attachment = parse_message(message_id)
if attachment:
send_attachment(attachment)
try:
body = body.decode("utf-8")
except AttributeError:
body = body
if len(body) >= 1800:
return longform(incoming["From"], body)
payload = {
"username": "awesomehook",
"embeds": [
{
"title": incoming["Subject"],
"type": "rich",
"author": {"name": incoming["From"]},
"description": body,
},
],
}
payload = json.dumps(payload)
conn = http.client.HTTPSConnection(HOOK_DOMAIN)
headers = {"Content-Type": "application/json"}
conn.request("POST", HOOK_PATH, payload, headers)
res = conn.getresponse()
return res.read()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment