Created
June 5, 2018 10:02
-
-
Save muuuuwa/631139f9ba3d6a54e969ab9af761623f to your computer and use it in GitHub Desktop.
メールをフィルタして、新着メールをSlack通知する
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
import imapy | |
import requests | |
from imapy.query_builder import Q | |
import json | |
import os | |
PASSWD = "xxxxxxxxx" | |
WEBHOOKURL = ( | |
"https://hooks.slack.com/services/xxxxxxxxxx" | |
) | |
def archive(emails, to_box): | |
"""メールをフォルダに移動して既読にする""" | |
for email in emails: | |
email.move(to_box).mark(["Seen"]) | |
email.mark("Seen") | |
def archive_sales(emails): | |
archive(emails, "INBOX.Archives.2018.Sales") | |
def archive_2018(emails): | |
archive(emails, "INBOX.Archives.2018") | |
def post_to_slack(message): | |
requests.post( | |
WEBHOOKURL, | |
data=json.dumps( | |
{ | |
"text": message, # 投稿するテキスト | |
"channel": "@hogehoge", | |
"username": u"hogehoge", # 投稿のユーザー名 | |
"icon_emoji": u":ghost:", # 投稿のプロフィール画像に入れる絵文字 | |
"link_names": 1, # メンションを有効にする | |
} | |
), | |
) | |
UNSEEN_EMAIL_JSON = "unseen_emails.json" | |
def check_new_unread_mails(id_and_subject_dict): | |
""" 新着メールのうち、Slack通知していないものだけを新しく通知する """ | |
# json.load して、比較。入力パラメータ側で、無いものがあれば、通知。 | |
# ファイルを削除したのち、json.dump する | |
unread_email_num = len(id_and_subject_dict) | |
if os.path.exists(UNSEEN_EMAIL_JSON): | |
with open(UNSEEN_EMAIL_JSON, "r") as f: | |
last_unseen_email_dict = json.load(f) | |
else: | |
last_unseen_email_dict = {} | |
last_ids = set(last_unseen_email_dict.keys()) | |
current_ids = set(id_and_subject_dict.keys()) | |
diff_ids = last_ids ^ current_ids | |
new_ids = [i for i in diff_ids if i in current_ids] | |
if new_ids: | |
# slack | |
message = ( | |
"unread email number is : {}\n".format(unread_email_num) | |
+ "new mail arrived. subjects are :" | |
+ ", \n".join([id_and_subject_dict[key] for key in new_ids]) | |
) | |
post_to_slack(message) | |
if os.path.exists(UNSEEN_EMAIL_JSON): | |
os.remove(UNSEEN_EMAIL_JSON) | |
with open(UNSEEN_EMAIL_JSON, "w") as f: | |
json.dump(id_and_subject_dict, f) | |
box = imapy.connect( | |
host="mail.hogehoge.com", | |
username="hogehoge@hogehoge.com", | |
password=PASSWD, | |
port=143, | |
# ssl=True) | |
ssl=False, | |
) | |
# filter sales mails | |
q = Q() | |
query = q.header("Delivered-To", "sales@hogehoge.com") | |
sales_emails = box.folder("INBOX").emails(query) | |
archive_sales(sales_emails) | |
# archive ERP login notification | |
q = Q() | |
zac_emails = box.folder("INBOX").emails(q.sender("no-reply@jp.oro.com")) | |
zac_login_emails = [email for email in zac_emails if "ZACログインのお知らせ" in email["subject"]] | |
archive_2018(zac_login_emails) | |
# check unread mails | |
q = Q() | |
unseen_emails = box.folder("INBOX").emails(q.unseen()) | |
unseen_email_dict = {email.uid: email["subject"] for email in unseen_emails} | |
check_new_unread_mails(unseen_email_dict) | |
box.logout() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment