Created
January 14, 2021 17:34
-
-
Save julian-r/063bde1abdbb9fe99bd139094aa7e316 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Small Wrapper for the mailjet api. | |
https://dev.mailjet.com/email/guides/send-api-v31/ | |
""" | |
from typing import List, Optional | |
from pydantic import BaseModel, EmailStr, Field | |
import httpx | |
SEND_API_ENDPOINT = "https://api.mailjet.com/v3.1/send" | |
class EmailAddress(BaseModel): | |
email: EmailStr = Field(alias="Email") | |
name: str = Field(alias="Name") | |
class Message(BaseModel): | |
from_: EmailStr = Field(alias="From") | |
to: List[EmailStr] = Field(alias="To") | |
subject: str = Field(alias="Subject") | |
text_part: str = Field(alias="TextPart") | |
html_part: Optional[str] = Field(alias="HTMLPart") | |
class SendApiBody(BaseModel): | |
messages: List[Message] = Field(alias="Messages") | |
class MailJetClient: | |
def __init__(self, api_key: str, api_secret: str, sender_address: str): | |
self.api_key = api_key | |
self.api_secret = api_secret | |
self.sender_address = sender_address | |
async def send(self, messages: List[Message]) -> None: | |
body = SendApiBody(messages=messages) | |
async with httpx.AsyncClient() as cl: | |
resp = await cl.post( | |
SEND_API_ENDPOINT, | |
json=body.dict(), | |
auth=(self.api_key, self.api_secret), | |
) | |
resp.raise_for_status() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment