Last active
February 8, 2025 11:48
-
-
Save tomaszkacprzak/f9f4ed96c716f9d4395661655f054a2e to your computer and use it in GitHub Desktop.
Send Email in a Loop from Templates
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import smtplib | |
import csv | |
import getpass | |
import argparse | |
from email.mime.text import MIMEText | |
from email.mime.multipart import MIMEMultipart | |
from tqdm import tqdm | |
from email_templates import TEMPLATES | |
# Command-line argument parsing | |
parser = argparse.ArgumentParser(description="Send emails to recipients from a CSV file.") | |
parser.add_argument("--dry-run", action="store_true", help="Preview emails without sending them.") | |
parser.add_argument("--addresses-file", type=str, required=True, help="Path to the addresses CSV file.") | |
parser.add_argument("--smtp-server", type=str, required=True, help="SMTP server address.") | |
parser.add_argument("--smtp-port", type=int, default=587, help="SMTP server port (default: 587).") | |
parser.add_argument("--smtp-user", type=str, required=True, help="SMTP username.") | |
args = parser.parse_args() | |
# SMTP Configuration | |
SMTP_SERVER = args.smtp_server | |
SMTP_PORT = args.smtp_port | |
SMTP_USER = args.smtp_user | |
SMTP_PASSWORD = None if args.dry_run else getpass.getpass("Enter your email password: ") # Skip password input if dry-run | |
# Read recipients from CSV file | |
def read_csv(filename): | |
recipients = [] | |
with open(filename, newline='', encoding='utf-8') as file: | |
reader = csv.reader(file) | |
for row in reader: | |
if row[0] == "First Name": # Skip header | |
continue | |
recipients.append(row) | |
return recipients | |
# Write updated recipients to CSV file | |
def write_csv(filename, recipients): | |
with open(filename, "w", newline='', encoding='utf-8') as file: | |
writer = csv.writer(file) | |
writer.writerow(["First Name", "Last Name", "Email", "TemplateID", "Sent"]) | |
writer.writerows(recipients) | |
# Send email | |
def send_email(to_address, subject, body): | |
if args.dry_run: | |
print("[DRY RUN] Email not sent.") | |
return True | |
try: | |
msg = MIMEMultipart() | |
msg['From'] = SMTP_USER | |
msg['To'] = to_address | |
msg['Subject'] = subject | |
msg.attach(MIMEText(body, 'plain')) | |
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server: | |
server.starttls() | |
server.login(SMTP_USER, SMTP_PASSWORD) | |
server.sendmail(SMTP_USER, to_address, msg.as_string()) | |
return True | |
except Exception as e: | |
print(f"Error sending email to {to_address}: {e}") | |
return False | |
# Main execution | |
recipients = read_csv(args.addresses_file) | |
total_recipients = len(recipients) | |
for index, recipient in enumerate(recipients, start=1): | |
first_name, last_name, email, template_id, sent_status = recipient | |
print(f"=================================================== Processing recipient {index}/{total_recipients}") | |
print(f"Name: {first_name} {last_name}, Email: {email}, Template ID: {template_id}, Sent Status: {sent_status}") | |
if sent_status == "1": # Skip already sent emails | |
print(f"Email already sent to {email}, skipping.") | |
continue | |
if template_id not in TEMPLATES: | |
print(f"Invalid template ID {template_id} for {email}, skipping.") | |
continue | |
subject = TEMPLATES[template_id]["subject"] | |
body = TEMPLATES[template_id]["body"].format(first_name=first_name, last_name=last_name) | |
print("\n--- Email Preview ---") | |
print(f"To: {email}") | |
print(f"Subject: {subject}") | |
print(f"Body:\n{body}") | |
print("----------------------") | |
confirm = input("Send this email? (y/n): ").strip().lower() | |
if confirm == "y": | |
if send_email(email, subject, body): | |
if not args.dry_run: | |
recipient[4] = "1" # Mark as sent | |
write_csv(args.addresses_file, recipients) # Write after each successful email | |
print("Email sent successfully.") | |
else: | |
print("Failed to send email.") | |
else: | |
print("Skipping email.") | |
print("\nEmail processing complete.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment