Skip to content

Instantly share code, notes, and snippets.

@pistatium
Last active February 1, 2023 10:57
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pistatium/defe9cd71a9d0487b92d922f871ddbfa to your computer and use it in GitHub Desktop.
Save pistatium/defe9cd71a9d0487b92d922f871ddbfa to your computer and use it in GitHub Desktop.
任意の Slack チャンネルにメールを飛ばすための Lambda スクリプト

任意のチャンネルでメールを受信できるようにするための AWS Lambda(Python) スクリプトです。

説明リンク: TBD

import os
import json
import email
from email.iterators import typed_subpart_iterator
from email.header import decode_header
from html.parser import HTMLParser
from base64 import urlsafe_b64decode
from urllib.parse import urlencode
from urllib.request import Request, urlopen
import boto3
BUCKET_NAME = os.environ.get('BUCKET_NAME')
SLACK_WEBHOOK = os.environ.get('SLACK_WEBHOOK')
DEFAULT_ENCODING = 'ascii'
s3 = boto3.resource('s3')
bucket = s3.Bucket(BUCKET_NAME)
def parse_mail(raw_mail):
msg = email.message_from_bytes(raw_mail)
header = decode_header(msg.get('Subject'))
return {
'body': get_body(msg),
'from': get_header(msg['From']),
'subject': get_header(msg['subject']),
}
def post_slack(params):
data = json.dumps(params)
headers = {"Content-Type" : "application/json"}
req = Request(SLACK_WEBHOOK, data=data.encode('utf-8'), headers=headers, method='POST')
res = urlopen(req)
return res
def get_charset(message, default=DEFAULT_ENCODING):
if message.get_content_charset():
return message.get_content_charset()
if message.get_charset():
return message.get_charset()
return default
def get_header(header_text):
headers = decode_header(header_text)
header_sections = []
for text, charset in headers:
if type(text) is str:
header_sections.append(text)
else:
header_sections.append(text.decode(charset or DEFAULT_ENCODING))
return "".join(header_sections)
def get_body(message):
if message.is_multipart():
body = []
# text/plain 部があったら優先して返す
for part in typed_subpart_iterator(message, 'text', 'plain'):
charset = get_charset(part, get_charset(message))
body.append(part.get_payload(decode=True).decode(charset))
text = u"\n".join(body).strip()
if text:
return text
# text/html しかなければタグを剥がして返す
for part in typed_subpart_iterator(message, 'text', 'html'):
charset = get_charset(part, get_charset(message))
body.append(part.get_payload(decode=True).decode(charset))
return strip_tags(u"\n".join(body).strip())
body = message.get_payload(decode=True).decode(get_charset(message))
return strip_tags(body)
class TagStripper(HTMLParser):
skip_tags = ['style'] # このタグで囲われた部分は無視
box_tags = ['h1', 'h2', 'h3', 'h4', 'dir', 'p', 'tr', 'br']
def __init__(self):
super().__init__()
self.reset()
self.fed = []
self.skip = False
def handle_data(self, d):
if self.skip:
return
self.fed.append(d.strip())
def handle_starttag(self, tag, attrs):
if tag in self.skip_tags:
self.skip = True
def handle_endtag(self, tag):
if tag in self.box_tags:
self.fed.append('\n')
if tag in self.skip_tags:
self.skip = False
def stripped(self):
return ''.join(self.fed).strip()
def strip_tags(html):
s = TagStripper()
s.feed(html)
res = s.stripped()
return res.replace('`', '\`')
def lambda_handler(event, context):
for mail in event['Records']:
if 'ses' not in mail:
continue
obj_id = mail['ses']['mail']['messageId']
obj = bucket.Object(f'{obj_id}')
raw_mail = obj.get()['Body'].read()
msg = parse_mail(raw_mail)
receipt = mail['ses']['receipt']['recipients'][0]
channel = '#' + receipt.split('@')[0]
res = post_slack({
'channel': channel,
'username': mail['ses']['mail']['commonHeaders']['from'][0],
'icon_emoji': ':mailbox:',
'text': f"""件名: {msg['subject']}\n```{msg['body']}```"""
})
print(res.__dict__)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment