Last active
April 23, 2023 22:10
-
-
Save pirogoeth/89235ceab7273487544b45fc004d452f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import argparse | |
import enum | |
import itertools | |
import smtplib | |
import time | |
import threading | |
import textwrap | |
import socket | |
import sys | |
from datetime import datetime | |
from email.message import EmailMessage | |
_output_lock = threading.Lock() | |
_shutdown = threading.Event() | |
class TestingMode(enum.Enum): | |
SPEED = 1 | |
TIMEOUT = 2 | |
THROUGHPUT = 3 | |
def get_test_function(self): | |
if self == TestingMode.SPEED: | |
return noop_smtp_session | |
elif self == TestingMode.TIMEOUT: | |
return hanging_smtp_session | |
elif self == TestingMode.THROUGHPUT: | |
return throughput_smtp_session | |
else: | |
raise ValueError("Unknown test mode") | |
class TestOptions: | |
def __init__(self, test_mode: str, interval: float, attempt_interval: float, | |
timeout: float, smtp_username: str, smtp_password: str, | |
smtp_from: str, smtp_to: str, smtp_debug: bool = False, | |
*, | |
dont_actually_send: bool = False, once: bool = False, | |
throughput_message_count: int = 100): | |
self.test_mode = TestingMode[test_mode] | |
self.interval = interval | |
self.attempt_interval = attempt_interval | |
self.timeout = timeout | |
self.smtp_username = smtp_username | |
self.smtp_password = smtp_password | |
self.smtp_from = smtp_from | |
self.smtp_to = smtp_to | |
self.smtp_debug = smtp_debug | |
self.dont_actually_send = dont_actually_send | |
self.once = once | |
self.throughput_message_count = throughput_message_count | |
def noop_smtp_session(options: TestOptions, host: str, port: int, ssl_enabled=False) -> int: | |
if ssl_enabled: | |
conn_factory = smtplib.SMTP_SSL | |
else: | |
conn_factory = smtplib.SMTP | |
start = time.time() | |
with conn_factory(host=host, port=port, timeout=options.timeout) as chat: | |
chat.set_debuglevel(2 if options.smtp_debug else 0) | |
chat.noop() | |
return time.time() - start | |
def hanging_smtp_session(options: TestOptions, host: str, port: int, ssl_enabled: bool = False) -> int: | |
if ssl_enabled: | |
conn_factory = smtplib.SMTP_SSL | |
else: | |
conn_factory = smtplib.SMTP | |
try: | |
attempts = 0 | |
with conn_factory(host=host, port=port, timeout=options.timeout) as chat: | |
chat.set_debuglevel(2 if options.smtp_debug else 0) | |
chat.login(options.smtp_username, options.smtp_password) | |
while not _shutdown.is_set(): | |
message = "\n".join([ | |
"Subject: SMTP Chat Timer TIMEOUT Test", | |
f"From: {options.smtp_from}", | |
f"To: {options.smtp_to}", | |
f"X-Mailgun-Drop-Message: {'yes' if options.dont_actually_send else 'no'}", | |
"", "", | |
"Hello from SMTP Chat Timer!", | |
f"Message generated at {datetime.utcnow()} from host {socket.gethostname()}.", | |
f"This message will be sent every {options.attempt_interval}*attempts seconds.", | |
f"Testing is currently on the {attempts}th attempt. Next attempt will be in {options.attempt_interval * (attempts + 1)} seconds.", | |
"", | |
"With Love,", | |
" SMTP Chat Timer", | |
]) | |
chat.sendmail(options.smtp_from, options.smtp_to, message) | |
attempts += 1 | |
time.sleep(options.attempt_interval * attempts) | |
raise Exception("shutdown") | |
except TimeoutError: | |
return options.interval * attempts | |
return -1 | |
def throughput_smtp_session(options: TestOptions, host: str, port: int, ssl_enabled: bool = False) -> int: | |
if ssl_enabled: | |
conn_factory = smtplib.SMTP_SSL | |
else: | |
conn_factory = smtplib.SMTP | |
start = time.time() | |
with conn_factory(host=host, port=port, timeout=options.timeout) as chat: | |
chat.set_debuglevel(2 if options.smtp_debug else 0) | |
chat.login(options.smtp_username, options.smtp_password) | |
for i in range(options.throughput_message_count): | |
message = "\n".join([ | |
"Subject: SMTP Chat Timer THROUGHPUT Test", | |
f"From: {options.smtp_from}", | |
f"To: {options.smtp_to}", | |
f"X-Mailgun-Drop-Message: {'yes' if options.dont_actually_send else 'no'}", | |
"", "", | |
"Hello from SMTP Chat Timer!", | |
f"Message generated at {datetime.utcnow()} from host {socket.gethostname()}.", | |
f"This message will be sent repeatedly as fast as possible.", | |
f"Testing is currently on the {i}th message.", | |
"", | |
"With Love,", | |
" SMTP Chat Timer", | |
]) | |
chat.sendmail(options.smtp_from, options.smtp_to, message) | |
return time.time() - start | |
def write_entry(host: str, port: int, time_taken: int, error: str): | |
current_time = datetime.utcnow().timestamp() | |
with _output_lock: | |
print(f"{current_time}, {host}, {port}, {time_taken}, {error}") | |
def test_loop(options: TestOptions, host: str, port: int, ssl_enabled: bool = False): | |
test_function = options.test_mode.get_test_function() | |
while not _shutdown.is_set(): | |
error = "none" | |
try: | |
time_taken = test_function(options, host, port, ssl_enabled=ssl_enabled) | |
if options.once: | |
_shutdown.set() | |
except Exception as err: | |
time_taken = -1 | |
error = err | |
write_entry(host, port, time_taken, error) | |
time.sleep(options.interval) | |
def main(): | |
test_modes = [mode.name for mode in TestingMode] | |
parser = argparse.ArgumentParser( | |
description="Test SMTP servers for speed and timeout behaviour" | |
) | |
parser.add_argument( | |
"--hosts", | |
type=str, | |
default=["smtp.mailgun.org", "smtp.eu.mailgun.org"], | |
help="List of hosts to test. Specify multiple times for multiple hosts. (default: %(default)s))", | |
nargs="*", | |
) | |
parser.add_argument( | |
"--ports", | |
type=str, | |
default=["25", "465:ssl", "587", "2525"], | |
help="Ports to test. Specify `:ssl` after the port to use SSL. Specify multiple times for multiple ports. (default: %(default)s)", | |
nargs="*", | |
) | |
parser.add_argument( | |
"--test-mode", | |
type=str, | |
default=TestingMode.SPEED.name, | |
help="Test mode to use (default: %(default)s)", | |
choices=test_modes, | |
) | |
parser.add_argument( | |
"--interval", | |
type=float, | |
default=1.0, | |
help="Interval between tests in seconds (default: %(default)s)", | |
) | |
parser.add_argument( | |
"--attempt-interval", | |
type=float, | |
default=0.0, | |
help="Interval between attempts in seconds (default: %(default)s)", | |
) | |
parser.add_argument( | |
"--timeout", | |
type=float, | |
default=1.0, | |
help="Timeout for SMTP connections in seconds (default: %(default)s)", | |
) | |
parser.add_argument( | |
"--smtp-username", | |
type=str, | |
default="nobody@example.com", | |
help="Username to use for SMTP connections (default: %(default)s)", | |
) | |
parser.add_argument( | |
"--smtp-password", | |
type=str, | |
default="", | |
help="Password to use for SMTP connections (default: %(default)s)", | |
) | |
parser.add_argument( | |
"--smtp-from", | |
type=str, | |
default="SMTP Chat Timer <smtptimer@example.org>", | |
help="From address to use for SMTP connections (default: %(default)s)", | |
) | |
parser.add_argument( | |
"--smtp-to", | |
type=str, | |
default="target@example.com", | |
help="To address to use for SMTP connections (default: %(default)s)", | |
) | |
parser.add_argument( | |
"--smtp-debug", | |
action="store_true", | |
help="Enable SMTP debug output", | |
default=False, | |
) | |
parser.add_argument( | |
"--dont-actually-send", | |
action="store_true", | |
help="Don't actually send any messages", | |
default=False, | |
) | |
parser.add_argument( | |
"--once", | |
action="store_true", | |
help="Run the test once and exit", | |
default=False, | |
) | |
parser.add_argument( | |
"--throughput-message-count", | |
type=int, | |
default=100, | |
help="Number of messages to send in each throughput test session (default: %(default)s)", | |
) | |
args = parser.parse_args(sys.argv[1:]) | |
options = TestOptions( | |
test_mode=args.test_mode, | |
interval=args.interval, | |
attempt_interval=args.attempt_interval, | |
timeout=args.timeout, | |
smtp_username=args.smtp_username, | |
smtp_password=args.smtp_password, | |
smtp_from=args.smtp_from, | |
smtp_to=args.smtp_to, | |
smtp_debug=args.smtp_debug, | |
dont_actually_send=args.dont_actually_send, | |
once=args.once, | |
throughput_message_count=args.throughput_message_count, | |
) | |
test_ports = [] | |
for port in args.ports: | |
if ":" in port: | |
port, option = port.split(":") | |
ssl_enabled = option == "ssl" | |
else: | |
ssl_enabled = False | |
test_ports.append((int(port), ssl_enabled)) | |
print("# current time, host, port, time taken in seconds, error") | |
children = [] | |
for endpoint in itertools.product(args.hosts, test_ports): | |
host, (port, option) = endpoint | |
child = threading.Thread( | |
target=test_loop, | |
args=(options, host, port, option), | |
daemon=True, | |
) | |
child.name = f"{host}-{port}-ssl-{option}" | |
child.start() | |
children.append(child) | |
try: | |
for child in itertools.cycle(children): | |
child.join(timeout=0.1) | |
time.sleep(0.1) | |
except (KeyboardInterrupt, SystemExit): | |
_shutdown.set() | |
for child in children: | |
child.join() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment