Skip to content

Instantly share code, notes, and snippets.

Last active February 22, 2023 17:39
Show Gist options
  • Save LiYChristopher/dc81f45375d4c8be04eb28caa3184772 to your computer and use it in GitHub Desktop.
Save LiYChristopher/dc81f45375d4c8be04eb28caa3184772 to your computer and use it in GitHub Desktop.
SendGrid v3 Mail Send - Async Example
import sendgrid
from sendgrid.helpers.mail import *
import os
import asyncio
sg = sendgrid.SendGridAPIClient(
from_email = Email("")
to_email = Email("")
content = Content("text/plain", "This is asynchronous sending test.")
# instantiate `sendgrid.helpers.mail.Mail` objects
em1 = Mail(from_email, "Message #1", to_email, content)
em2 = Mail(from_email, "Message #2", to_email, content)
em3 = Mail(from_email, "Message #3", to_email, content)
em4 = Mail(from_email, "Message #4", to_email, content)
em5 = Mail(from_email, "Message #5", to_email, content)
em6 = Mail(from_email, "Message #6", to_email, content)
em7 = Mail(from_email, "Message #7", to_email, content)
em8 = Mail(from_email, "Message #8", to_email, content)
em9 = Mail(from_email, "Message #9", to_email, content)
em10 = Mail(from_email, "Message #10", to_email, content)
ems = [em1, em2, em3, em4, em5, em6, em7, em8, em9, em10]
async def send_email(n, email):
send_mail wraps SendGrid's API client, and makes a POST request to
the api/v3/mail/send endpoint with `email`.
email<sendgrid.helpers.mail.Mail>: single mail object.
response =
if response.status_code < 300:
print("Email #{} processed".format(n), response.body, response.status_code)
except urllib.error.HTTPError as e:
def send_many(emails, cb):
send_many creates a number of non-blocking tasks (to send email)
that will run on the existing event loop. Due to non-blocking nature,
you can include a callback that will run after all tasks have been queued.
emails<list>: contains any # of `sendgrid.helpers.mail.Mail`.
cb<function>: a function that will execute immediately.
print("START - sending emails ...")
for n, em in enumerate(emails):
asyncio.async(send_email(n, em))
print("END - returning control...")
def sample_cb():
print("Executing callback now...")
for i in range(0, 100):
if __name__ == "__main__":
loop = asyncio.get_event_loop()
task = asyncio.async(send_many(ems, sample_cb))
Copy link

Line 40 response = will block; defeating the attempt at an async implementation.

exactly 💯

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment