Skip to content

Instantly share code, notes, and snippets.

@ritwickdey
Last active February 13, 2025 16:29
Show Gist options
  • Save ritwickdey/ffeb7a88a86e92c7bdb92644e8fba449 to your computer and use it in GitHub Desktop.
Save ritwickdey/ffeb7a88a86e92c7bdb92644e8fba449 to your computer and use it in GitHub Desktop.
Mail Merge using gmail and google spreedsheet (PS: Most of the code is generated)
{
"sender_email": "email@email",
"app_password": "your_app_password",
"spreadsheet_id": "your_spreadsheet_id",
"sheet_name": "sheet_name",
"template_subject": "Subject_line",
"sender_placeholder": "email_placeholder",
"start_row": 2,
"end_row": 4,
"email_delay": 10
}
  1. It reads email template from gmail's draft, and reads email list from speadsheet.
  2. Write back to speedsheet after email sent (success or failed) and it's time
  3. support template variables.
  4. Smilar to google's mail merge app script but that doesn't work for more that 100 emails
  5. email, sent status & sent at are the required field in the spreedsheet, rest are the variables.

PS: Inline attachments are showing as normal attachments. (Bug)

from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
import os
import pickle
import base64
import datetime
import sys
import time
import smtplib
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from email.utils import formataddr
from email import message_from_bytes
import json
from quopri import decodestring, encodestring
# spreadsheet and gmail draft email read scope
SCOPES = [
'https://www.googleapis.com/auth/spreadsheets',
'https://www.googleapis.com/auth/gmail.modify',
]
# Add SMTP settings
SMTP_SERVER = "smtp.gmail.com"
SMTP_PORT = 587
def get_credentials():
"""Gets valid user credentials from storage or initiates OAuth2 flow."""
creds = None
if os.path.exists('token.pickle'):
with open('token.pickle', 'rb') as token:
creds = pickle.load(token)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
with open('token.pickle', 'wb') as token:
pickle.dump(creds, token)
return creds
def get_draft_by_subject(service, subject):
"""Fetches email draft by matching subject."""
try:
drafts = service.users().drafts().list(userId='me').execute()
for draft in drafts.get('drafts', []):
draft_data = service.users().drafts().get(userId='me', id=draft['id']).execute()
draft_msg = draft_data['message']
msg_payload = draft_msg['payload']
headers = msg_payload['headers']
for header in headers:
if header['name'] != 'Subject' or header['value'] != subject:
continue
draft_raw = service.users().drafts().get(userId='me', id=draft['id'],
format='raw').execute()
return draft_raw
return None
except Exception as e:
print(f"Error fetching draft: {e}")
raise e
def send_email(smtp_session,
to_email,
subject,
body: MIMEMultipart,
sender_email,
sender_placeholder=None,
attachments=None):
"""Sends email using SMTP."""
try:
message = body
headers_to_remove = ['Subject', 'From', 'To', 'Date']
for header in headers_to_remove:
message.__delitem__(header)
message['To'] = to_email
message['Subject'] = subject
if sender_placeholder:
message['From'] = formataddr((sender_placeholder, sender_email))
else:
message['From'] = sender_email
message['List-Unsubscribe'] = f'<mailto:{sender_email}?subject=unsubscribe>'
if attachments:
for attachment in attachments:
part = MIMEBase('application', 'octet-stream')
attachment_data = base64.urlsafe_b64decode(attachment['data'])
part.set_payload(attachment_data)
encoders.encode_base64(part)
part.add_header('Content-Disposition', f'attachment; filename= {attachment["filename"]}')
part.add_header('Content-Type', attachment['mimeType'])
message.attach(part)
# Send email via SMTP
smtp_session.send_message(message)
return True, "Email sent successfully"
except Exception as e:
print(f"Error sending email: {e}")
return False, str(e)
def process_spreadsheet(
spreadsheet_id,
start_row,
end_row,
template_subject,
creds,
smtp_session,
sender_email,
sender_placeholder="",
email_delay=10,
sheet_name="Sheet1", # Add default sheet name
):
"""Process the spreadsheet and send emails."""
try:
# Build sheets service
sheets_service = build('sheets', 'v4', credentials=creds)
# Build Gmail service
gmail_service = build('gmail', 'v1', credentials=creds)
# Get template from drafts
draft_msg_template = get_draft_by_subject(gmail_service, template_subject)
if not draft_msg_template:
raise ValueError("Template not found in drafts")
# Get spreadsheet data
range_name = f'{sheet_name}!A{start_row}:Z{end_row}'
sheet = sheets_service.spreadsheets()
result = sheet.values().get(spreadsheetId=spreadsheet_id, range=range_name).execute()
values = result.get('values', [])
if not values:
print('No data found.')
return
# Get header row
header_result = sheet.values().get(
spreadsheetId=spreadsheet_id,
range=f'{sheet_name}!A1:Z1',
).execute()
headers = header_result.get('values', [[]])[0]
# Find required column indexes more flexibly
def find_column_index(headers, possible_names):
for name in possible_names:
for i, header in enumerate(headers):
if header.lower().strip() == name.lower().strip():
return i
return -1
# Find required column indexes with multiple possible names
email_idx = find_column_index(headers, ['email'])
status_idx = find_column_index(headers, ['sent status'])
timestamp_idx = find_column_index(headers, ['sent at'])
assert email_idx >= 0, "Email column not found. Expected: 'email'"
assert status_idx >= 0, "Status column not found. Expected: 'sent status'"
assert timestamp_idx >= 0, "Timestamp column not found. Expected: 'sent at'"
skipped = 0
sent = 0
failed = 0
# Process each row
for row_idx, row in enumerate(values, start=start_row):
# Extend row if needed
while len(row) < max(email_idx + 1, status_idx + 1, timestamp_idx + 1):
row.append('')
if len(row) > status_idx and row[status_idx].startswith('Success'):
print(f"Skipping row {row_idx}: Already sent")
skipped += 1
continue
# Create variables dict for template
variables = {headers[i]: row[i] for i in range(len(row)) if i < len(headers)}
to_email = row[email_idx]
try:
email_content = clone_draft_and_replace_variable(draft_msg_template, variables)
email_subject = replace_template_variables(template_subject, variables)
success, message = send_email(
smtp_session,
to_email,
email_subject,
email_content,
sender_email,
sender_placeholder,
)
# Update status and timestamp
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
row[status_idx] = 'Success' if success else f'Failed: {message}'
row[timestamp_idx] = now
if success:
sent += 1
else:
failed += 1
# Update spreadsheet
range_name = f'{sheet_name}!A{row_idx}:Z{row_idx}'
body = {'values': [row]}
sheet.values().update(spreadsheetId=spreadsheet_id,
range=range_name,
valueInputOption='RAW',
body=body).execute()
print(f"Processed row {row_idx}: {row[status_idx]}: {to_email}")
if email_delay > 0:
time.sleep(email_delay)
except Exception as e:
print(f"Error processing row {row_idx}: {e}")
print(f"Processed {len(values)} rows. Sent: {sent}, Failed: {failed}, Skipped: {skipped}")
except Exception as e:
print(f"Error: {e}")
def replace_template_variables(template, variables):
"""Replace {var} style variables in template with values from variables dict."""
content = template
# hack to replace all possible variable formats
for key, value in variables.items():
key = key.strip()
_key1 = "{{ " + key + " }}"
_key2 = "{{ " + key + "}}"
_key4 = "{{ " + key + "}}"
_key3 = "{{" + key + "}}"
keys = [_key1, _key2, _key3, _key4]
for _key in keys:
content = content.replace(_key, str(value))
if '{{' in content:
print(f"Warning: Unreplaced variables in template: {content}")
raise ValueError("Unreplaced variables in template")
return content
def clone_draft_and_replace_variable(draft, variables):
raw_draft_bytes = base64.urlsafe_b64decode(draft['message']['raw'])
new_email = message_from_bytes(raw_draft_bytes)
for part in new_email.walk():
content_type = part.get_content_type()
if content_type in ['text/html', 'text/plain']:
content = decodestring(part.get_payload()).decode()
content = replace_template_variables(content, variables)
part.set_payload(encodestring(content.encode()).decode())
part.set_type(content_type)
return new_email
def load_config(skip_input=False):
"""Load config from file with fallback to user input."""
try:
with open('config.json', 'r') as f:
config = json.load(f)
except FileNotFoundError:
config = {}
if skip_input:
return config
# Get required values with fallback to user input, showing defaults
def get_input(key, prompt, default=""):
value = config.get(key, default)
if value:
user_input = input(f"{prompt} (default: {value}): ").strip()
return user_input if user_input else value
return input(f"{prompt}: ")
config['sender_email'] = get_input('sender_email', "Enter your Gmail address")
config['app_password'] = get_input('app_password', "Enter your Gmail App Password")
config['spreadsheet_id'] = get_input('spreadsheet_id', "Enter the Google Spreadsheet ID")
config['template_subject'] = get_input('template_subject',
"Enter the email template subject to use from drafts")
default_start = str(config.get('start_row', 2))
start_input = input(f"Enter start row number (default: {default_start}): ").strip()
config['start_row'] = int(start_input) if start_input else int(default_start)
default_end = str(config.get('end_row'))
end_input = input(f"Enter end row number (default: {default_end}): ").strip()
config['end_row'] = int(end_input) if end_input else int(default_end)
config['sender_placeholder'] = get_input('sender_placeholder', "Enter sender name")
config['sheet_name'] = get_input('sheet_name', "Enter sheet name", "Sheet1")
default_delay = str(config.get('email_delay', 10))
delay_input = input(f"Enter email delay in seconds (default: {default_delay}): ").strip()
config['email_delay'] = int(delay_input) if delay_input else int(default_delay)
# Validate start_row
if config['start_row'] < 2:
raise ValueError("Start row must be 2 or greater")
return config
def main():
# Get credentials
creds = get_credentials()
# Load config
skip_input = "--skip" in sys.argv
config = load_config(skip_input)
# Setup SMTP session
smtp_session = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
smtp_session.starttls()
smtp_session.login(config['sender_email'], config['app_password'])
try:
process_spreadsheet(
config['spreadsheet_id'],
config['start_row'],
config['end_row'],
config['template_subject'],
creds,
smtp_session,
config['sender_email'],
config['sender_placeholder'],
config['email_delay'],
config['sheet_name'], # Pass sheet name to function
)
finally:
smtp_session.quit()
if __name__ == '__main__':
main()
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
import os
import pickle
import base64
import datetime
import time
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from email.mime.text import MIMEText
from email.utils import formataddr
from email import message_from_bytes
# If modifying these scopes, delete the file token.pickle.
SCOPES = [
'https://www.googleapis.com/auth/spreadsheets',
'https://www.googleapis.com/auth/gmail.modify',
]
def get_credentials():
"""Gets valid user credentials from storage or initiates OAuth2 flow."""
creds = None
if os.path.exists('token.pickle'):
with open('token.pickle', 'rb') as token:
creds = pickle.load(token)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
with open('token.pickle', 'wb') as token:
pickle.dump(creds, token)
return creds
def get_draft_by_subject(service, subject):
"""Fetches email draft by matching subject."""
try:
drafts = service.users().drafts().list(userId='me').execute()
for draft in drafts.get('drafts', []):
draft_data = service.users().drafts().get(userId='me', id=draft['id']).execute()
draft_msg = draft_data['message']
msg_payload = draft_msg['payload']
headers = msg_payload['headers']
for header in headers:
if header['name'] != 'Subject' or header['value'] != subject:
continue
draft_raw = service.users().drafts().get(userId='me', id=draft['id'],
format='raw').execute()
return draft_raw
return None
except Exception as e:
print(f"Error fetching draft: {e}")
raise e
def send_email(service,
to_email,
subject,
body: MIMEMultipart,
sender_placeholder=None,
attachments=None):
"""Sends email using Gmail API."""
try:
message = body
headers_to_remove = ['Subject', 'From', 'To', 'Date']
for header in headers_to_remove:
message.__delitem__(header)
message['To'] = to_email
message['Subject'] = subject
user_email = get_user_email(service)
if not user_email:
raise ValueError("Could not get user email address")
if sender_placeholder:
message['From'] = formataddr((sender_placeholder, user_email))
if attachments:
for attachment in attachments:
part = MIMEBase('application', 'octet-stream')
attachment_data = base64.urlsafe_b64decode(attachment['data'])
part.set_payload(attachment_data)
encoders.encode_base64(part)
part.add_header('Content-Disposition', f'attachment; filename= {attachment["filename"]}')
part.add_header('Content-Type', attachment['mimeType'])
message.attach(part)
raw_message = base64.urlsafe_b64encode(message.as_string().encode('utf-8')).decode('utf-8')
send_message = service.users().messages().send(userId='me', body={
'raw': raw_message,
}).execute()
return True, send_message['id']
except Exception as e:
print(f"Error sending email: {e}")
return False, str(e)
def get_user_email(service):
"""Get the authenticated user's email address."""
try:
profile = service.users().getProfile(userId='me').execute()
return profile['emailAddress']
except Exception as e:
print(f"Error getting user email: {e}")
return None
def process_spreadsheet(
spreadsheet_id,
start_row,
end_row,
template_subject,
creds,
sender_placeholder="",
email_delay=10,
):
"""Process the spreadsheet and send emails."""
try:
# Build services
sheets_service = build('sheets', 'v4', credentials=creds)
gmail_service = build('gmail', 'v1', credentials=creds)
# Get template from drafts
draft_msg_template = get_draft_by_subject(gmail_service, template_subject)
if not draft_msg_template:
raise ValueError("Template not found in drafts")
# Get spreadsheet data
range_name = f'Sheet1!A{start_row}:Z{end_row}'
sheet = sheets_service.spreadsheets()
result = sheet.values().get(spreadsheetId=spreadsheet_id, range=range_name).execute()
values = result.get('values', [])
if not values:
print('No data found.')
return
# Get header row
header_result = sheet.values().get(
spreadsheetId=spreadsheet_id,
range='Sheet1!A1:Z1',
).execute()
headers = header_result.get('values', [[]])[0]
# Find required column indexes
email_idx = headers.index('email')
status_idx = headers.index('sent status')
timestamp_idx = headers.index('sent at')
assert email_idx >= 0, "Email (email) column not found"
assert status_idx >= 0, "Status column (sent status) not found"
assert timestamp_idx >= 0, "Timestamp column (sent at) not found"
skipped = 0
sent = 0
failed = 0
# Process each row
for row_idx, row in enumerate(values, start=start_row):
# Extend row if needed
while len(row) < max(email_idx + 1, status_idx + 1, timestamp_idx + 1):
row.append('')
if len(row) > status_idx and row[status_idx].startswith('Success'):
print(f"Skipping row {row_idx}: Already sent")
skipped += 1
continue
# Create variables dict for template
variables = {headers[i]: row[i] for i in range(len(row)) if i < len(headers)}
to_email = row[email_idx]
try:
email_content = clone_draft_and_replace_variable(draft_msg_template, variables)
email_subject = replace_template_variables(template_subject, variables)
success, message = send_email(
gmail_service,
to_email,
email_subject,
email_content,
sender_placeholder,
)
# Update status and timestamp
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
row[status_idx] = 'Success' if success else f'Failed: {message}'
row[timestamp_idx] = now
if success:
sent += 1
else:
failed += 1
# Update spreadsheet
range_name = f'Sheet1!A{row_idx}:Z{row_idx}'
body = {'values': [row]}
sheet.values().update(spreadsheetId=spreadsheet_id,
range=range_name,
valueInputOption='RAW',
body=body).execute()
print(f"Processed row {row_idx}: {row[status_idx]}: {to_email}")
if email_delay > 0:
time.sleep(email_delay)
except Exception as e:
print(f"Error processing row {row_idx}: {e}")
print(f"Processed {len(values)} rows. Sent: {sent}, Failed: {failed}, Skipped: {skipped}")
except Exception as e:
print(f"Error: {e}")
def replace_template_variables(template, variables):
"""Replace {var} style variables in template with values from variables dict."""
content = template
# hack to replace all possible variable formats
for key, value in variables.items():
key = key.strip()
_key1 = "{{ "+ key +" }}"
_key2 = "{{ "+ key +"}}"
_key4 = "{{ "+ key +"}}"
_key3 = "{{"+ key +"}}"
keys = [_key1, _key2, _key3, _key4]
for _key in keys:
content = content.replace(_key, str(value))
if '{{' in content:
print(f"Warning: Unreplaced variables in template: {content}")
raise ValueError("Unreplaced variables in template")
return content
def clone_draft_and_replace_variable(draft, variables):
raw_draft_bytes = base64.urlsafe_b64decode(draft['message']['raw'])
new_email = message_from_bytes(raw_draft_bytes)
for part in new_email.walk():
content_type = part.get_content_type()
if content_type in ['text/html', 'text/plain']:
content = part.get_payload()
content = replace_template_variables(content, variables)
part.set_payload(content)
part.set_type(content_type)
return new_email
def main():
# Get spreadsheet ID from user
spreadsheet_id = input("Enter the Google Spreadsheet ID: ")
# Get template subject
template_subject = input("Enter the email template subject to use from drafts: ")
# Get row range
start_row = int(input("Enter start row number (2 or greater): "))
end_row = int(input("Enter end row number: "))
sender_placeholder = input("Enter sender name: ")
email_delay = int(input("Enter email delay in seconds (default 10): ") or 10)
if start_row < 2:
raise ValueError("Start row must be 2 or greater")
# Get credentials and process spreadsheet
creds = get_credentials()
process_spreadsheet(
spreadsheet_id,
start_row,
end_row,
template_subject,
creds,
sender_placeholder,
email_delay,
)
if __name__ == '__main__':
main()
google-auth-oauthlib==1.2.1
google-api-python-client==2.140.0
google-auth==2.33.0
email name sent status sent at
example@foo.co Bar1
example@bar.co Foo1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment