Skip to content

Instantly share code, notes, and snippets.

@samuelcolvin
Last active July 5, 2018 18:38
Show Gist options
  • Save samuelcolvin/3652427c07fac775d0cdc8af127c0ed1 to your computer and use it in GitHub Desktop.
Save samuelcolvin/3652427c07fac775d0cdc8af127c0ed1 to your computer and use it in GitHub Desktop.
import asyncio
import aiodns
from aiosmtplib import SMTP, SMTPException, SMTPStatus
from async_timeout import timeout
async def _validate_email(email, loop):
domain = email.split('@', 1)[1]
resolver = aiodns.DNSResolver(loop=loop)
with timeout(2, loop=loop):
v = await resolver.query(domain, 'MX')
host = v[0][0]
smtp = SMTP(hostname=host, port=25, loop=loop)
code, ans1 = await smtp.connect()
if code != SMTPStatus.ready:
raise EmailError(f'wrong smtp response {code} (expected 220) ans="{ans1}"')
code, ans2 = await smtp.helo()
if code != SMTPStatus.completed:
raise EmailError(f'wrong smtp response {code} (expected 250) ans="{ans2}"')
# could also do the following but probably too far
code, ans3 = await smtp.mail('whatever@example.com')
debug(code, ans3)
code, ans4 = await smtp.rcpt(email)
debug(code, ans4)
logger.info('"%s": host=%s, connect ans="%s", helo ans="%s"', email, host, ans1, ans2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment