Skip to content

Instantly share code, notes, and snippets.

@vinovator
Last active April 19, 2016 20:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vinovator/588abbeaebf7921037f3d74f47d66196 to your computer and use it in GitHub Desktop.
Save vinovator/588abbeaebf7921037f3d74f47d66196 to your computer and use it in GitHub Desktop.
Mail bot sript that sends predefined response to predefined mails. Intended for raspberry pi, which has its dedicated mail id
# mailbot.py
# python 2.7.6
"""
Mail bot sript that sends predefined response to predefined mails
Intended for raspberry pi, which has its dedicated mail id
Algorithm
1) Check a dedicated mailbox inbox for "unread" mails
2) For each "unread" mail, fetch the sender, subject and content
3) If the sender is from a pre-approved list, proceed
4) If the subject is from a pre-approved list, proceed
5) Optionally check for content as well, depending upon subject
6) Based on the subject determine the relevant "predefined" response
7) Send the response back to the pre-approved sender
8) Mark the mail as read
9) Configure the script in a cron job or windows scheduler to poll every X mins
"""
import imaplib
import smtplib
import email
from IGNORE import raspi_secrets # Untracked py to carry secrets
from UTILS import mailmap # Mapping between mail subject and response
IMAP_server = "imap.gmail.com" # google imap server
SMTP_server = "smtp.gmail.com" # google smtp server
# List of constants inherited from an untracked file
mail_id = raspi_secrets.login["mail"]
pwd = raspi_secrets.login["password"]
# List of authorized mail ids
authorized_senders = raspi_secrets.authorized_senders
# Dict with subject - response pairs
mail_map = mailmap.mail_map
def imap_login():
# Instantiate IMAP interface
imap = imaplib.IMAP4_SSL(IMAP_server)
# Login to IMAP server
try:
status, login_summary = imap.login(mail_id, pwd)
if status == "OK":
return status, imap
else:
return "login failure", None
except imaplib.IMAP4.error as e:
return "login exception: " + str(e), None
def get_unread_mail(imap):
""" Connect to inbox and fetch unread mails """
# Select inbox folder
inbox_status, data = imap.select("Inbox")
if not inbox_status == "OK":
return inbox_status, None
# Search for unread mails
unread_mail_status, messages = imap.search(None, "(UNSEEN)")
return unread_mail_status, messages
def is_authorized(mail_id):
""" Check if the given mail id is from pre-authorized list """
if email.utils.parseaddr(mail_id)[1] in authorized_senders:
return True
else:
# TODO: Notify owner
return False
def process_mail(message_data):
""" Get the sender and subject metadata from each mail """
for section in message_data:
if isinstance(section, tuple):
message = email.message_from_string(section[1])
sender = message["From"]
subject = message["Subject"]
if is_authorized(sender):
return (sender, subject) # return as tuple
else:
return None
def smtp_login():
""" Login to smtp server """
smtpObj = smtplib.SMTP(SMTP_server, 587) # 587 indicates TLS encryption
# Send a hello message to establish connection, returns 250 if success
smtpObj.ehlo()
smtpObj.starttls() # start TLS encryption
# login to server, 235 is returned if success
smtpObj.login(mail_id, pwd)
return smtpObj
def send_mail(smtpObj, mail_id, recieved_subject):
""" Given the mail subject, provide pre-defined response """
# Send mail
to_name, to_email = email.utils.parseaddr(mail_id)
mail_subject = mail_map[recieved_subject]
mail_body = "Hello {0}, {1}. Cheers".format(to_name, mail_subject)
smtpObj.sendmail(mail_id,
to_email,
"subject:" + mail_subject + "\n" + mail_body)
if __name__ == "__main__":
""" Starting block """
error_message = ""
send_mail_queue = list()
# login to the IMAP server
login_status, imapObj = imap_login()
if not login_status == "OK":
error_message = login_status
# Fetch unread mail ids
mail_status, messages = get_unread_mail(imapObj)
if not mail_status == "OK":
error_message = mail_status
# Get data from each unread mail message
for mail_num in messages[0].split(): # e.g. for 2 mails ['1 2']
message_code, message_data = imapObj.fetch(mail_num, "(RFC822)")
if message_code == "OK":
# returns tuple - mail_id, subject
entry = process_mail(message_data)
if entry:
send_mail_queue.append(entry)
# Close and logout of IMAP server
imapObj.close()
imapObj.logout()
# Login to SMTP server
if len(send_mail_queue) > 0:
smtp = smtp_login()
# Send mail response from SMTP server
for item in send_mail_queue:
mail_id = item[0]
subject = item[1].lower()
if subject in mail_map.keys():
print "Dict match found"
send_mail(smtp, mail_id, subject)
else:
pass
# TODO: Notify sender abour unrecognized subject
# Finally disconnect from SMTP server
smtp.quit()
# Finally if any error message, print it
if error_message:
print error_message
# mailbot_py3.py
# python 3.5
"""
Mail bot sript that sends predefined response to predefined mails
Intended for raspberry pi, which has its dedicated mail id
Algorithm
1) Check a dedicated mailbox inbox for "unread" mails
2) For each "unread" mail, fetch the sender, subject and content
3) If the sender is from a pre-approved list, proceed
4) If the subject is from a pre-approved list, proceed
5) Optionally check for content as well, depending upon subject
6) Based on the subject determine the relevant "predefined" response
7) Send the response back to the pre-approved sender
8) Mark the mail as read
9) Configure the script in a cron job or windows scheduler to poll every X mins
"""
import imaplib
import smtplib
import email
from IGNORE import raspi_secrets # Untracked py to carry secrets
from UTILS import mailmap # Mapping between mail subject and response
IMAP_server = "imap.gmail.com" # google imap server
SMTP_server = "smtp.gmail.com" # google smtp server
# List of constants inherited from an untracked file
mail_id = raspi_secrets.login["mail"]
pwd = raspi_secrets.login["password"]
# List of authorized mail ids
authorized_senders = raspi_secrets.authorized_senders
# Dict with subject - response pairs
mail_map = mailmap.mail_map
def imap_login():
# Instantiate IMAP interface
imap = imaplib.IMAP4_SSL(IMAP_server)
# Login to IMAP server
try:
status, login_summary = imap.login(mail_id, pwd)
if status == "OK":
return status, imap
else:
return "login failure", None
except imaplib.IMAP4.error as e:
return "login exception: " + str(e), None
def get_unread_mail(imap):
""" Connect to inbox and fetch unread mails """
# Select inbox folder
inbox_status, data = imap.select(mailbox="INBOX")
if not inbox_status == "OK":
return inbox_status, None
# Search for unread mails
unread_mail_status, messages = imap.search(None, "(UNSEEN)")
return unread_mail_status, messages
def is_authorized(mail_id):
""" Check if the given mail id is from pre-authorized list """
if email.utils.parseaddr(mail_id)[1] in authorized_senders:
return True
else:
# TODO: Notify owner
return False
def process_mail(message_data):
""" Get the sender and subject metadata from each mail """
for section in message_data:
if isinstance(section, tuple):
# for python 2.x, change this to message_from_string
message = email.message_from_bytes(section[1])
sender = message["From"]
subject = message["Subject"]
if is_authorized(sender):
return (sender, subject) # return as tuple
else:
return None
def smtp_login():
""" Login to smtp server """
smtpObj = smtplib.SMTP(SMTP_server, 587) # 587 indicates TLS encryption
# Send a hello message to establish connection, returns 250 if success
smtpObj.ehlo()
smtpObj.starttls() # start TLS encryption
# login to server, 235 is returned if success
smtpObj.login(mail_id, pwd)
return smtpObj
def send_mail(smtpObj, mail_id, recieved_subject):
""" Given the mail subject, provide pre-defined response """
# Send mail
to_name, to_email = email.utils.parseaddr(mail_id)
mail_subject = mail_map[recieved_subject]
mail_body = "Hello {0}, {1}. Cheers".format(to_name, mail_subject)
smtpObj.sendmail(mail_id,
to_email,
"subject:" + mail_subject + "\n" + mail_body)
def main():
""" Script that controls all function calls """
send_mail_queue = list()
# login to the IMAP server
login_status, imapObj = imap_login()
if not login_status == "OK":
return login_status
# Fetch unread mail ids
mail_status, messages = get_unread_mail(imapObj)
if not mail_status == "OK":
return mail_status
# Get data from each unread mail message
for mail_num in messages[0].split(): # e.g. for 2 mails ['1 2']
message_code, message_data = imapObj.fetch(mail_num, "(RFC822)")
if message_code == "OK":
# returns tuple - mail_id, subject
entry = process_mail(message_data)
if entry:
send_mail_queue.append(entry)
# Close and logout of IMAP server
imapObj.close()
imapObj.logout()
# Login to SMTP server
if len(send_mail_queue) > 0:
smtp = smtp_login()
# Send mail response from SMTP server
for item in send_mail_queue:
mail_id = item[0]
subject = item[1].lower()
if subject in mail_map.keys():
send_mail(smtp, mail_id, subject)
else:
pass
# TODO: Notify sender abour unrecognized subject
# Finally disconnect from SMTP server
smtp.quit()
return None
if __name__ == "__main__":
""" Starting block """
error_message = main()
# Finally if any error message, print it
if error_message:
print(error_message)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment