Skip to content

Instantly share code, notes, and snippets.

@julian-r
Created January 14, 2021 17:34
Show Gist options
  • Save julian-r/063bde1abdbb9fe99bd139094aa7e316 to your computer and use it in GitHub Desktop.
Save julian-r/063bde1abdbb9fe99bd139094aa7e316 to your computer and use it in GitHub Desktop.
""" Small Wrapper for the mailjet api.
https://dev.mailjet.com/email/guides/send-api-v31/
"""
from typing import List, Optional
from pydantic import BaseModel, EmailStr, Field
import httpx
SEND_API_ENDPOINT = "https://api.mailjet.com/v3.1/send"
class EmailAddress(BaseModel):
email: EmailStr = Field(alias="Email")
name: str = Field(alias="Name")
class Message(BaseModel):
from_: EmailStr = Field(alias="From")
to: List[EmailStr] = Field(alias="To")
subject: str = Field(alias="Subject")
text_part: str = Field(alias="TextPart")
html_part: Optional[str] = Field(alias="HTMLPart")
class SendApiBody(BaseModel):
messages: List[Message] = Field(alias="Messages")
class MailJetClient:
def __init__(self, api_key: str, api_secret: str, sender_address: str):
self.api_key = api_key
self.api_secret = api_secret
self.sender_address = sender_address
async def send(self, messages: List[Message]) -> None:
body = SendApiBody(messages=messages)
async with httpx.AsyncClient() as cl:
resp = await cl.post(
SEND_API_ENDPOINT,
json=body.dict(),
auth=(self.api_key, self.api_secret),
)
resp.raise_for_status()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment