Skip to content

Instantly share code, notes, and snippets.

@pirogoeth
Last active April 23, 2023 22:10
Show Gist options
  • Save pirogoeth/89235ceab7273487544b45fc004d452f to your computer and use it in GitHub Desktop.
Save pirogoeth/89235ceab7273487544b45fc004d452f to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import enum
import itertools
import smtplib
import time
import threading
import textwrap
import socket
import sys
from datetime import datetime
from email.message import EmailMessage
_output_lock = threading.Lock()
_shutdown = threading.Event()
class TestingMode(enum.Enum):
SPEED = 1
TIMEOUT = 2
THROUGHPUT = 3
def get_test_function(self):
if self == TestingMode.SPEED:
return noop_smtp_session
elif self == TestingMode.TIMEOUT:
return hanging_smtp_session
elif self == TestingMode.THROUGHPUT:
return throughput_smtp_session
else:
raise ValueError("Unknown test mode")
class TestOptions:
def __init__(self, test_mode: str, interval: float, attempt_interval: float,
timeout: float, smtp_username: str, smtp_password: str,
smtp_from: str, smtp_to: str, smtp_debug: bool = False,
*,
dont_actually_send: bool = False, once: bool = False,
throughput_message_count: int = 100):
self.test_mode = TestingMode[test_mode]
self.interval = interval
self.attempt_interval = attempt_interval
self.timeout = timeout
self.smtp_username = smtp_username
self.smtp_password = smtp_password
self.smtp_from = smtp_from
self.smtp_to = smtp_to
self.smtp_debug = smtp_debug
self.dont_actually_send = dont_actually_send
self.once = once
self.throughput_message_count = throughput_message_count
def noop_smtp_session(options: TestOptions, host: str, port: int, ssl_enabled=False) -> int:
if ssl_enabled:
conn_factory = smtplib.SMTP_SSL
else:
conn_factory = smtplib.SMTP
start = time.time()
with conn_factory(host=host, port=port, timeout=options.timeout) as chat:
chat.set_debuglevel(2 if options.smtp_debug else 0)
chat.noop()
return time.time() - start
def hanging_smtp_session(options: TestOptions, host: str, port: int, ssl_enabled: bool = False) -> int:
if ssl_enabled:
conn_factory = smtplib.SMTP_SSL
else:
conn_factory = smtplib.SMTP
try:
attempts = 0
with conn_factory(host=host, port=port, timeout=options.timeout) as chat:
chat.set_debuglevel(2 if options.smtp_debug else 0)
chat.login(options.smtp_username, options.smtp_password)
while not _shutdown.is_set():
message = "\n".join([
"Subject: SMTP Chat Timer TIMEOUT Test",
f"From: {options.smtp_from}",
f"To: {options.smtp_to}",
f"X-Mailgun-Drop-Message: {'yes' if options.dont_actually_send else 'no'}",
"", "",
"Hello from SMTP Chat Timer!",
f"Message generated at {datetime.utcnow()} from host {socket.gethostname()}.",
f"This message will be sent every {options.attempt_interval}*attempts seconds.",
f"Testing is currently on the {attempts}th attempt. Next attempt will be in {options.attempt_interval * (attempts + 1)} seconds.",
"",
"With Love,",
" SMTP Chat Timer",
])
chat.sendmail(options.smtp_from, options.smtp_to, message)
attempts += 1
time.sleep(options.attempt_interval * attempts)
raise Exception("shutdown")
except TimeoutError:
return options.interval * attempts
return -1
def throughput_smtp_session(options: TestOptions, host: str, port: int, ssl_enabled: bool = False) -> int:
if ssl_enabled:
conn_factory = smtplib.SMTP_SSL
else:
conn_factory = smtplib.SMTP
start = time.time()
with conn_factory(host=host, port=port, timeout=options.timeout) as chat:
chat.set_debuglevel(2 if options.smtp_debug else 0)
chat.login(options.smtp_username, options.smtp_password)
for i in range(options.throughput_message_count):
message = "\n".join([
"Subject: SMTP Chat Timer THROUGHPUT Test",
f"From: {options.smtp_from}",
f"To: {options.smtp_to}",
f"X-Mailgun-Drop-Message: {'yes' if options.dont_actually_send else 'no'}",
"", "",
"Hello from SMTP Chat Timer!",
f"Message generated at {datetime.utcnow()} from host {socket.gethostname()}.",
f"This message will be sent repeatedly as fast as possible.",
f"Testing is currently on the {i}th message.",
"",
"With Love,",
" SMTP Chat Timer",
])
chat.sendmail(options.smtp_from, options.smtp_to, message)
return time.time() - start
def write_entry(host: str, port: int, time_taken: int, error: str):
current_time = datetime.utcnow().timestamp()
with _output_lock:
print(f"{current_time}, {host}, {port}, {time_taken}, {error}")
def test_loop(options: TestOptions, host: str, port: int, ssl_enabled: bool = False):
test_function = options.test_mode.get_test_function()
while not _shutdown.is_set():
error = "none"
try:
time_taken = test_function(options, host, port, ssl_enabled=ssl_enabled)
if options.once:
_shutdown.set()
except Exception as err:
time_taken = -1
error = err
write_entry(host, port, time_taken, error)
time.sleep(options.interval)
def main():
test_modes = [mode.name for mode in TestingMode]
parser = argparse.ArgumentParser(
description="Test SMTP servers for speed and timeout behaviour"
)
parser.add_argument(
"--hosts",
type=str,
default=["smtp.mailgun.org", "smtp.eu.mailgun.org"],
help="List of hosts to test. Specify multiple times for multiple hosts. (default: %(default)s))",
nargs="*",
)
parser.add_argument(
"--ports",
type=str,
default=["25", "465:ssl", "587", "2525"],
help="Ports to test. Specify `:ssl` after the port to use SSL. Specify multiple times for multiple ports. (default: %(default)s)",
nargs="*",
)
parser.add_argument(
"--test-mode",
type=str,
default=TestingMode.SPEED.name,
help="Test mode to use (default: %(default)s)",
choices=test_modes,
)
parser.add_argument(
"--interval",
type=float,
default=1.0,
help="Interval between tests in seconds (default: %(default)s)",
)
parser.add_argument(
"--attempt-interval",
type=float,
default=0.0,
help="Interval between attempts in seconds (default: %(default)s)",
)
parser.add_argument(
"--timeout",
type=float,
default=1.0,
help="Timeout for SMTP connections in seconds (default: %(default)s)",
)
parser.add_argument(
"--smtp-username",
type=str,
default="nobody@example.com",
help="Username to use for SMTP connections (default: %(default)s)",
)
parser.add_argument(
"--smtp-password",
type=str,
default="",
help="Password to use for SMTP connections (default: %(default)s)",
)
parser.add_argument(
"--smtp-from",
type=str,
default="SMTP Chat Timer <smtptimer@example.org>",
help="From address to use for SMTP connections (default: %(default)s)",
)
parser.add_argument(
"--smtp-to",
type=str,
default="target@example.com",
help="To address to use for SMTP connections (default: %(default)s)",
)
parser.add_argument(
"--smtp-debug",
action="store_true",
help="Enable SMTP debug output",
default=False,
)
parser.add_argument(
"--dont-actually-send",
action="store_true",
help="Don't actually send any messages",
default=False,
)
parser.add_argument(
"--once",
action="store_true",
help="Run the test once and exit",
default=False,
)
parser.add_argument(
"--throughput-message-count",
type=int,
default=100,
help="Number of messages to send in each throughput test session (default: %(default)s)",
)
args = parser.parse_args(sys.argv[1:])
options = TestOptions(
test_mode=args.test_mode,
interval=args.interval,
attempt_interval=args.attempt_interval,
timeout=args.timeout,
smtp_username=args.smtp_username,
smtp_password=args.smtp_password,
smtp_from=args.smtp_from,
smtp_to=args.smtp_to,
smtp_debug=args.smtp_debug,
dont_actually_send=args.dont_actually_send,
once=args.once,
throughput_message_count=args.throughput_message_count,
)
test_ports = []
for port in args.ports:
if ":" in port:
port, option = port.split(":")
ssl_enabled = option == "ssl"
else:
ssl_enabled = False
test_ports.append((int(port), ssl_enabled))
print("# current time, host, port, time taken in seconds, error")
children = []
for endpoint in itertools.product(args.hosts, test_ports):
host, (port, option) = endpoint
child = threading.Thread(
target=test_loop,
args=(options, host, port, option),
daemon=True,
)
child.name = f"{host}-{port}-ssl-{option}"
child.start()
children.append(child)
try:
for child in itertools.cycle(children):
child.join(timeout=0.1)
time.sleep(0.1)
except (KeyboardInterrupt, SystemExit):
_shutdown.set()
for child in children:
child.join()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment