Skip to content

Instantly share code, notes, and snippets.

@muuuuwa
Created June 5, 2018 10:02
Show Gist options
  • Save muuuuwa/631139f9ba3d6a54e969ab9af761623f to your computer and use it in GitHub Desktop.
Save muuuuwa/631139f9ba3d6a54e969ab9af761623f to your computer and use it in GitHub Desktop.
メールをフィルタして、新着メールをSlack通知する
# coding: utf-8
import imapy
import requests
from imapy.query_builder import Q
import json
import os
PASSWD = "xxxxxxxxx"
WEBHOOKURL = (
"https://hooks.slack.com/services/xxxxxxxxxx"
)
def archive(emails, to_box):
"""メールをフォルダに移動して既読にする"""
for email in emails:
email.move(to_box).mark(["Seen"])
email.mark("Seen")
def archive_sales(emails):
archive(emails, "INBOX.Archives.2018.Sales")
def archive_2018(emails):
archive(emails, "INBOX.Archives.2018")
def post_to_slack(message):
requests.post(
WEBHOOKURL,
data=json.dumps(
{
"text": message, # 投稿するテキスト
"channel": "@hogehoge",
"username": u"hogehoge", # 投稿のユーザー名
"icon_emoji": u":ghost:", # 投稿のプロフィール画像に入れる絵文字
"link_names": 1, # メンションを有効にする
}
),
)
UNSEEN_EMAIL_JSON = "unseen_emails.json"
def check_new_unread_mails(id_and_subject_dict):
""" 新着メールのうち、Slack通知していないものだけを新しく通知する """
# json.load して、比較。入力パラメータ側で、無いものがあれば、通知。
# ファイルを削除したのち、json.dump する
unread_email_num = len(id_and_subject_dict)
if os.path.exists(UNSEEN_EMAIL_JSON):
with open(UNSEEN_EMAIL_JSON, "r") as f:
last_unseen_email_dict = json.load(f)
else:
last_unseen_email_dict = {}
last_ids = set(last_unseen_email_dict.keys())
current_ids = set(id_and_subject_dict.keys())
diff_ids = last_ids ^ current_ids
new_ids = [i for i in diff_ids if i in current_ids]
if new_ids:
# slack
message = (
"unread email number is : {}\n".format(unread_email_num)
+ "new mail arrived. subjects are :"
+ ", \n".join([id_and_subject_dict[key] for key in new_ids])
)
post_to_slack(message)
if os.path.exists(UNSEEN_EMAIL_JSON):
os.remove(UNSEEN_EMAIL_JSON)
with open(UNSEEN_EMAIL_JSON, "w") as f:
json.dump(id_and_subject_dict, f)
box = imapy.connect(
host="mail.hogehoge.com",
username="hogehoge@hogehoge.com",
password=PASSWD,
port=143,
# ssl=True)
ssl=False,
)
# filter sales mails
q = Q()
query = q.header("Delivered-To", "sales@hogehoge.com")
sales_emails = box.folder("INBOX").emails(query)
archive_sales(sales_emails)
# archive ERP login notification
q = Q()
zac_emails = box.folder("INBOX").emails(q.sender("no-reply@jp.oro.com"))
zac_login_emails = [email for email in zac_emails if "ZACログインのお知らせ" in email["subject"]]
archive_2018(zac_login_emails)
# check unread mails
q = Q()
unseen_emails = box.folder("INBOX").emails(q.unseen())
unseen_email_dict = {email.uid: email["subject"] for email in unseen_emails}
check_new_unread_mails(unseen_email_dict)
box.logout()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment