Created
October 7, 2017 02:03
-
-
Save cawka/7c0d18ddc4b95a639d37e176d830d77e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import csv | |
import logging | |
import os | |
import re | |
import smtplib | |
import sys | |
import argparse | |
from datetime import datetime | |
from email.mime.multipart import MIMEMultipart | |
from email.mime.text import MIMEText | |
from time import sleep | |
import config # Our file config.py | |
# setup logging to specified log file | |
logging.basicConfig(filename=config.LOG_FILENAME, level=logging.DEBUG) | |
class PyMailer(): | |
""" | |
A python bulk mailer commandline utility. Takes six arguments: the path to the html file to be parsed; the database of recipients (.csv); the subject of the email; email address the mail comes from; the name the email is from; the number of emails to send to each recipient. | |
""" | |
def __init__(self, args, **kwargs): | |
self.txt_path = args.txt[0] | |
self.html_path = args.html[0] | |
self.csv_path = args.addresses[0] | |
self.subject = args.subject[0] | |
self.from_name = kwargs.get('from_name', config.FROM_NAME) | |
self.from_email = kwargs.get('to_name', config.FROM_EMAIL) | |
self.nb_emails_per_recipient = kwargs.get('nb_emails_per_recipient', config.NB_EMAILS_PER_RECIPIENT) | |
def _stats(self, message): | |
""" | |
Update stats log with: last recipient (in case the server crashes); datetime started; datetime ended; total number of recipients attempted; number of failed recipients; and database used. | |
""" | |
try: | |
stats_file = open(config.STATS_FILE, 'r') | |
except IOError: | |
raise IOError("Invalid or missing stats file path.") | |
stats_entries = stats_file.read().split('\n') | |
# check if the stats entry exists if it does overwrite it with the new message | |
is_existing_entry = False | |
if stats_entries: | |
for i, entry in enumerate(stats_entries): | |
if entry: | |
if message[:5] == entry[:5]: | |
stats_entries[i] = message | |
is_existing_entry = True | |
# if the entry does not exist append it to the file | |
if not is_existing_entry: | |
stats_entries.append(message) | |
stats_file = open(config.STATS_FILE, 'w') | |
for entry in stats_entries: | |
if entry: | |
stats_file.write("%s\n" % entry) | |
stats_file.close() | |
def _validate_email(self, email_address): | |
""" | |
Validate the supplied email address. | |
""" | |
if not email_address or len(email_address) < 5: | |
return None | |
if not re.match(r"[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*@(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?", email_address): | |
return None | |
return email_address | |
def _retry_handler(self, recipient_data): | |
""" | |
Write failed recipient_data to csv file to be retried again later. | |
""" | |
try: | |
csv_file = open(config.CSV_RETRY_FILENAME, 'wb+', encoding='utf-8') | |
except IOError: | |
raise IOError("Invalid or missing csv file path.") | |
csv_writer = csv.writer(csv_file) | |
csv_writer.writerow([ | |
recipient_data.get('name'), | |
recipient_data.get('email') | |
]) | |
csv_file.close() | |
def _prepare_text(template, recipient_data): | |
""" | |
Open, parse and substitute placeholders with recipient data. | |
""" | |
try: | |
file = open(template, 'rt', encoding='utf-8') | |
except IOError: | |
raise IOError("Invalid or missing html file path.") | |
content = file.read() | |
if not content: | |
raise Exception("The html file is empty.") | |
# replace all placeolders associated to recipient_data keys | |
if recipient_data: | |
for key, value in recipient_data.items(): | |
placeholder = "%%%%%s%%%%" % key.upper() | |
content = content.replace(placeholder, value) | |
return content | |
def _form_email(self, recipient_data): | |
""" | |
Form the html email, including mimetype and headers. | |
""" | |
# instatiate the email object and assign headers | |
email_message = MIMEMultipart('alternative') | |
if self.txt_path != "": | |
email_message.attach(MIMEText(PyMailer._prepare_text(self.txt_path, recipient_data), 'plain')) | |
if self.html_path != "": | |
email_message.attach(MIMEText(PyMailer._prepare_text(self.html_path, recipient_data), 'html')) | |
email_message['From'] = recipient_data.get('sender') | |
email_message['To'] = recipient_data.get('recipient') | |
email_message['Subject'] = self.subject | |
return email_message.as_string() | |
def _parse_csv(self, csv_path=None): | |
""" | |
Parse the entire csv file and return a list of dicts. | |
""" | |
is_resend = csv_path is not None | |
if not csv_path: | |
csv_path = self.csv_path | |
try: | |
csv_file = open(csv_path, 'r+t', encoding='utf-8') | |
except IOError: | |
raise IOError("Invalid or missing csv file path.") | |
csv_reader = csv.reader(csv_file) | |
""" | |
Invalid emails ignored | |
""" | |
variables_names = [] | |
recipients_list = [] | |
for i, row in enumerate(csv_reader): | |
# Get header keys | |
if i == 0: | |
for cell in row: | |
variables_names.append(cell) | |
continue | |
# Get all variables | |
variables = {} | |
for j, var_name in enumerate(variables_names): | |
if var_name == 'email': | |
if self._validate_email(row[j]): | |
variables[var_name] = row[j] | |
recipients_list.append(variables) | |
else: | |
variables[var_name] = row[j] | |
# clear the contents of the resend csv file | |
if is_resend: | |
csv_file.write('') | |
csv_file.close() | |
return recipients_list | |
def send(self, retry_count=0, recipient_list=None): | |
""" | |
Iterate over the recipient list and send the specified email. | |
""" | |
if config.ENCRYPT_MODE != 'none' and config.ENCRYPT_MODE != 'ssl' and config.ENCRYPT_MODE != 'starttls': | |
raise Exception("Please choose a correct ENCRYPT_MODE") | |
if not recipient_list: | |
recipient_list = self._parse_csv() | |
if retry_count: | |
recipient_list = self._parse_csv(config.CSV_RETRY_FILENAME) | |
# save the number of recipient and time started to the stats file | |
if not retry_count: | |
self._stats("TOTAL RECIPIENTS: %s" % len(recipient_list)) | |
self._stats("START TIME: %s" % datetime.now()) | |
# instantiate the number of falied recipients | |
failed_recipients = 0 | |
for recipient_data in recipient_list: | |
if recipient_data.get('name'): | |
recipient_data['recipient'] = "%s <%s>" % (recipient_data.get('name'), recipient_data.get('email')) | |
else: | |
recipient_data['recipient'] = recipient_data.get('email') | |
recipient_data['sender'] = "%s <%s>" % (self.from_name, self.from_email) | |
# instantiate the required vars to send email | |
message = self._form_email(recipient_data) | |
for nb in range(0, self.nb_emails_per_recipient): | |
print("Sending to %s..." % recipient_data.get('recipient')) | |
try: | |
# send the actual email | |
if config.ENCRYPT_MODE == 'ssl': | |
smtp_server = smtplib.SMTP_SSL(host=config.SMTP_HOST, port=config.SMTP_PORT, timeout=10) | |
else: | |
smtp_server = smtplib.SMTP(host=config.SMTP_HOST, port=config.SMTP_PORT, timeout=10) | |
if config.ENCRYPT_MODE != 'none': | |
smtp_server.ehlo() | |
if config.ENCRYPT_MODE == 'starttls': | |
smtp_server.starttls() | |
smtp_server.ehlo() | |
if config.SMTP_USER and config.SMTP_PASSWORD: | |
smtp_server.login(config.SMTP_USER, config.SMTP_PASSWORD) | |
smtp_server.sendmail(recipient_data.get('sender'), recipient_data.get('recipient'), message) | |
smtp_server.close() | |
# save the last recipient to the stats file incase the process fails | |
self._stats("LAST RECIPIENT: %s" % recipient_data.get('recipient')) | |
# allow the system to sleep for .25 secs to take load off the SMTP server | |
sleep(1) | |
except smtplib.SMTPException as e: | |
print("EXCEPTION") | |
print(repr(e)) | |
logging.error("Recipient email address failed: %s\n=== Exception ===\n%s" % (recipient, repr(e))) | |
self._retry_handler(recipient_data) | |
# save the number of failed recipients to the stats file | |
failed_recipients = failed_recipients + 1 | |
self._stats("FAILED RECIPIENTS: %s" % failed_recipients) | |
def send_test(self): | |
self.send(recipient_list=config.TEST_RECIPIENTS) | |
def resend_failed(self): | |
""" | |
Try and resend to failed recipients two more times. | |
""" | |
for i in range(1, 3): | |
self.send(retry_count=i) | |
def count_recipients(self, csv_path=None): | |
return len(self._parse_csv(csv_path)) | |
parser = argparse.ArgumentParser(description='PyMailer, a simple bulk mailer script') | |
parser.add_argument('-t', dest='test_only', action='store_true', | |
help='Send to test emails in config.py only') | |
parser.add_argument('-s', dest='send', action='store_true', | |
help='Send to email in the supplied CSV file') | |
parser.add_argument('txt', metavar='txt', nargs=1, help='txt template') | |
parser.add_argument('html', metavar='html', nargs=1, help='HTML template') | |
parser.add_argument('addresses', metavar='dest_csv',nargs=1, help='CSV containing names and addresses (<name>,<address)*') | |
parser.add_argument('subject', metavar='subject', nargs=1, help='Email subject') | |
if __name__ == '__main__': | |
args = parser.parse_args() | |
if not args.test_only and not args.send: | |
print("ERROR: Either -t or -s must be specified") | |
sys.exit(1) | |
open(config.CSV_RETRY_FILENAME, 'w').close() # Creates a new one or overwrite the old one | |
if not os.path.exists(config.STATS_FILE): | |
open(config.STATS_FILE, 'w').close() | |
pymailer = PyMailer(args) | |
if args.send: | |
if input("You are about to send to %s recipients. Do you want to continue (yes/no)? " % pymailer.count_recipients()) == 'yes': | |
# save the csv file used to the stats file | |
pymailer._stats("CSV USED: %s" % args.addresses[0]) | |
# send the email and try resend to failed recipients | |
pymailer.send() | |
pymailer.resend_failed() | |
else: | |
print("Aborted.") | |
sys.exit() | |
elif args.test_only: | |
if input("You are about to send a test mail to all recipients as specified in config.py. Do you want to continue (yes/no)? ") == 'yes': | |
pymailer.send_test() | |
else: | |
print("Aborted.") | |
sys.exit() | |
# save the end time to the stats file | |
pymailer._stats("END TIME: %s" % datetime.now()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment