Skip to content

Instantly share code, notes, and snippets.

@williamsjj
Created July 19, 2010 17:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save williamsjj/481684 to your computer and use it in GitHub Desktop.
Save williamsjj/481684 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
import sys
if sys.platform == "linux2":
from twisted.internet import epollreactor
epollreactor.install()
import datetime, time
from StringIO import StringIO
from twisted.internet import reactor, task
from twisted.internet import defer
from twisted.mail import smtp
EMAIL_FROM = "jasonjwwilliams@gmail.com"
EMAIL_TO = "test@test.com"
TARGET_SERVER = "1.2.3.4"
# Verify Recipients SMTP Client
class ESMTPVerifyRecipsClient(smtp.ESMTPClient):
"""Async ESMTP client that verifies the given recipients are valid and then
disconnects. No DATA transmission."""
def smtpState_toOrData(self, code, resp):
"""Overrides the built-in smtpState_toOrData so it disconnects after
verifying recipients instead of moving to a DATA connection."""
if self.lastAddress is not None:
self.toAddressesResult.append((self.lastAddress, code, resp))
if code in smtp.SUCCESS:
self.successAddresses.append(self.lastAddress)
try:
self.lastAddress = self.toAddresses.next()
except StopIteration:
if self.successAddresses:
return self.smtpState_msgSent(code,'No recipients accepted')
else:
return self.smtpState_msgSent(code,'No recipients accepted')
else:
self.sendLine('RCPT TO:%s' % smtp.quoteaddr(self.lastAddress))
class ESMTPVerifyRecipsSender(smtp.SenderMixin, ESMTPVerifyRecipsClient):
def __init__(self, username, secret, contextFactory=None, *args, **kw):
self.heloFallback = 0
self.username = username
if contextFactory is None:
contextFactory = self._getContextFactory()
ESMTPVerifyRecipsClient.__init__(self, secret, contextFactory, *args, **kw)
def _getContextFactory(self):
if self.context is not None:
return self.context
try:
from twisted.internet import ssl
except ImportError:
return None
else:
try:
context = ssl.ClientContextFactory()
context.method = ssl.SSL.TLSv1_METHOD
return context
except AttributeError:
return None
######
check_status = []
def regularCheck():
num_left = 0
for x in range(len(check_status)):
if check_status[x] == 0:
num_left = num_left + 1
print "%s - (Num Finished: %d) (Num Left: %d)" % (str(datetime.datetime.now()), len(check_status) - num_left, num_left)
def SMTPdone(i):
print "SMTP %d done." % i
check_status[i] = 1
def SMTPerr(failure, i):
print "SMTP %d failed." % i
check_status[i] = -1
def SMTPcall():
for i in range(50):
check_status.append(0)
d = defer.Deferred()
factory = smtp.ESMTPSenderFactory(None, None, EMAIL_FROM,
EMAIL_TO,
StringIO("Test"),
d,
retries=1,
requireAuthentication=False,
requireTransportSecurity=False)
d.addCallback(SMTPdone, i).addErrback(SMTPerr, i)
factory.protocol = ESMTPVerifyRecipsSender
factory.domain = "digitarx.com"
factory.noisy = False
print "Starting SMTP %d." % i
reactor.connectTCP(TARGET_SERVER, 25, factory, timeout=10)
if __name__ == "__main__":
reactor.callLater(0, SMTPcall)
task.LoopingCall(regularCheck).start(1)
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment