Skip to content

Instantly share code, notes, and snippets.

@prestonw
Forked from gmemstr/Pipfile
Last active September 5, 2023 08:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save prestonw/9d18d7dfc4c90254cad4773de09f8a2b to your computer and use it in GitHub Desktop.
Save prestonw/9d18d7dfc4c90254cad4773de09f8a2b to your computer and use it in GitHub Desktop.
Speedtest against Hetzner servers

Hetzner Speedtest

This Python application serves as a rudimentary speed test utility designed to evaluate network performance against public Hetzner endpoints. The tool not only measures download speeds but also provides approximate latency metrics to various Hetzner data centers for a comprehensive network assessment.

usage

python hetzner-speedtest.py [lg|md|sm] -v

-v is optional to enter verbose mode

-h displays options

arg size
lg 10GB
md 1GB
sm 100MB
import sys, requests, time, math, subprocess, platform, threading
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
verbose = '-v' in sys.argv
def downloadFile(u, hostname):
try:
if verbose:
print(f"Attempting to download from {u}...")
session = requests.Session()
retry = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
headers = {'User-Agent': 'Mozilla/5.0'}
if verbose:
print("Session initialized. Sending GET request...")
r = session.get(u, headers=headers, stream=True, timeout=10)
if '-v' in sys.argv:
pdb.set_trace()
if r.status_code == 200:
if verbose:
print("GET request successful. Streaming content...")
else:
print(f"GET request failed with status code {r.status_code}")
return None
tl = int(r.headers.get('content-length'))
if verbose:
print(f"Content length: {tl} bytes.")
pts = []
latencies = []
dl = 0
start_time = time.time()
with open("/dev/null", 'wb') as f:
st = time.process_time()
for c in r.iter_content(1024):
if verbose:
print("Writing chunk to file...")
dl += len(c)
f.write(c)
time_diff = time.process_time() - st
if time_diff > 0: # prevent division by zero
pts.append(dl // (time_diff))
latencies.append(get_latency(hostname))
p = int(50 * dl / tl)
sys.stdout.write(f"\r[{'#' * p}{'.' * (50 - p)}] {int(100 * dl / tl)}%")
sys.stdout.flush()
end_time = time.time()
elapsed_time = end_time - start_time
avg = round(sum(pts) / len(pts), 2) if pts else 0
avg_latency = round(sum(latencies) / len(latencies), 2) if latencies else 0
max_jitter = max(latencies) - min(latencies) if latencies else 0
packet_loss = (1 - dl / tl) * 100 if tl else 0
if verbose:
print(f"{time.time()-start_time:.2f}s: Download completed.")
return avg, elapsed_time, avg_latency, max_jitter, packet_loss
except Exception as e:
print(f"An error occurred: {e}")
return None
intro_art = '''
# #
# # ###### ##### ###### # # ###### #####
# # # # # ## # # # #
####### ##### # # # # # ##### # #
# # # # # # # # # #####
# # # # # # ## # # #
# # ###### # ###### # # ###### # #
##### #######
# # ##### ###### ###### ##### # ###### #### #####
# # # # # # # # # # #
##### # # ##### ##### # # # ##### #### #
# ##### # # # # # # # #
# # # # # # # # # # # #
##### # ###### ###### ##### # ###### #### #
'''
print(intro_art)
hosts = {
"fsn1-speed.hetzner.com": {"sm": "https://fsn1-speed.hetzner.com/100MB.bin", "md": "https://fsn1-speed.hetzner.com/1GB.bin", "lg": "https://fsn1-speed.hetzner.com/10GB.bin"},
"hel1-speed.hetzner.com": {"sm": "https://hel1-speed.hetzner.com/100MB.bin", "md": "https://hel1-speed.hetzner.com/1GB.bin", "lg": "https://hel1-speed.hetzner.com/10GB.bin"},
"speed.hetzner.de": {"sm": "https://speed.hetzner.de/100MB.bin", "md": "https://speed.hetzner.de/1GB.bin", "lg": "https://speed.hetzner.de/10GB.bin"},
"ash.icmp.hetzner.com": {"sm": "http://ash.icmp.hetzner.com/100MB.bin", "md": "http://ash.icmp.hetzner.com/1GB.bin", "lg": "http://ash.icmp.hetzner.com/10GB.bin"},
"hil.icmp.hetzner.com": {"sm": "http://hil.icmp.hetzner.com/100MB.bin", "md": "http://hil.icmp.hetzner.com/1GB.bin", "lg": "http://hil.icmp.hetzner.com/10GB.bin"}
}
loc_map = {
"fsn1-speed.hetzner.com": "Falkenstein, Germany",
"hel1-speed.hetzner.com": "Helsinki, Finland",
"speed.hetzner.de": "Nuremberg, Germany",
"ash.icmp.hetzner.com": "Ashburn, Virginia, USA",
"hil.icmp.hetzner.com": "Hillsboro, Oregon, USA"
}
def convert_size(s):
if s <= 0:
return "0B"
i = int(math.floor(math.log(s, 1024)))
return f"{round(s / math.pow(1024, i), 2)}{'B,KB,MB,GB,TB,PB,EB,ZB,YB'.split(',')[i]}"
def get_latency(h):
if platform.system() == "Windows": return 0
try:
o = subprocess.check_output(["ping", "-c", "1", h]).decode("utf-8").split("\n")
for l in o:
if "time=" in l: return float(l.split("time=")[1].split(" ")[0])
except: return 0
def downloadFile(u, hostname):
try:
start_time = time.time()
if verbose:
print(f"{time.time()-start_time:.2f}s: Attempting to download from {u}...")
session = requests.Session()
retry = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
headers = {'User-Agent': 'Mozilla/5.0'}
if verbose:
print(f"{time.time()-start_time:.2f}s: Session initialized. Sending GET request...")
r = session.get(u, headers=headers, stream=True, timeout=10)
if r.status_code == 200:
if verbose:
print(f"{time.time()-start_time:.2f}s: GET request successful. Streaming content...")
else:
print(f"{time.time()-start_time:.2f}s: GET request failed with status code {r.status_code}")
return None
tl = int(r.headers.get('content-length'))
if verbose:
print(f"{time.time()-start_time:.2f}s: Content length: {tl} bytes.")
pts = []
latencies = []
dl = 0
start_time = time.time()
with open("/dev/null", 'wb') as f:
st = time.process_time()
last_latency_check = time.time() # Initialize the last latency check to the current time
latency_check_interval = 5 # Set how often to check latency, in seconds
for c in r.iter_content(1024):
dl += len(c)
f.write(c)
time_diff = time.process_time() - st
if time_diff > 0: #prevent division by zero
pts.append(dl // time_diff)
current_time = time.time()
if current_time - last_latency_check > latency_check_interval:
latencies.append(get_latency(hostname))
last_latency_check = current_time # Update the last latency check time
p = int(50 * dl / tl)
sys.stdout.write(f"\r[{'#' * p}{'.' * (50 - p)}] {int(100 * dl / tl)}%")
sys.stdout.flush()
end_time = time.time()
elapsed_time = end_time - start_time
avg = round(sum(pts) / len(pts), 2) if pts else 0
avg_latency = round(sum(latencies) / len(latencies), 2) if latencies else 0
max_jitter = max(latencies) - min(latencies) if latencies else 0
packet_loss = (1 - dl / tl) * 100 if tl else 0
if verbose:
print(f"{time.time()-start_time:.2f}s: Download completed.")
return avg, elapsed_time, avg_latency, max_jitter, packet_loss
except Exception as e:
print(f"{time.time()-start_time:.2f}s: An error occurred: {e}")
return None
def main():
sz = None
if len(sys.argv) > 1 and sys.argv[1] in ["sm", "md", "lg"]:
sz = sys.argv[1]
if sz is None:
print("Usage: python3 hetzner-speedtest.py [sm] Small 100MB")
print(" [md] Medium 1GB")
print(" [lg] Large 10GB")
sz = input("Enter size (sm, md, lg): ")
if sz not in ["sm", "md", "lg"]:
return
else:
sz = sys.argv[1]
lat_res = {}
spd_res = {}
time_res = {}
avg_latency_res = {}
max_jitter_res = {}
packet_loss_res = {}
traceroute_res = {}
dns_resolution_res = {}
dns_resolution_time_res = {}
for h in hosts:
loc = loc_map.get(h, h)
print(f"\nTesting {loc}")
f = hosts[h][sz]
download_result = downloadFile(f, h)
if download_result is not None:
avg_spd, elapsed_time, avg_latency, max_jitter, packet_loss = download_result
lat_res[loc] = get_latency(h)
spd_res[loc] = avg_spd
time_res[loc] = elapsed_time
avg_latency_res[loc] = avg_latency
max_jitter_res[loc] = max_jitter
packet_loss_res[loc] = packet_loss
print(f"\nSpeed: {convert_size(avg_spd)}/s")
print(f"Time: {elapsed_time:.2f} seconds")
print(f"Latency: {lat_res[loc]}ms")
print(f"Avg Latency: {avg_latency}ms")
print(f"Max Jitter: {max_jitter}ms")
print(f"Packet Loss: {packet_loss:.2f}%")
# Uncomment these lines if you implement traceroute and DNS resolution
# print(f"Traceroute: \n{traceroute_result}")
# print(f"DNS Resolution: {dns_resolution_result} (Time: {dns_resolution_time:.4f} seconds)")
else:
print(f"Could not complete the download for {loc}. Skipping...")
if lat_res:
min_lat_loc = min(lat_res, key=lat_res.get)
max_spd_loc = max(spd_res, key=spd_res.get)
min_time_loc = min(time_res, key=time_res.get)
min_jitter_loc = min(max_jitter_res, key=max_jitter_res.get)
max_jitter_loc = max(max_jitter_res, key=max_jitter_res.get)
else:
print("No successful streams established to analyse.")
print(f"\nResults:")
print(f"Lowest Latency: {min_lat_loc.ljust(25)}")
print(f"Latency: {lat_res[min_lat_loc]:.3f}ms; Time: {time_res[min_lat_loc]:.2f}s; Speed: {convert_size(spd_res[min_lat_loc])}/s)")
print(f"Highest Speed: {max_spd_loc.ljust(25)}")
print(f"(Latency: {lat_res[max_spd_loc]:.3f}ms; Time: {time_res[max_spd_loc]:.2f}s; Speed: {convert_size(spd_res[max_spd_loc])}/s)")
print(f"Fastest Time: {min_time_loc.ljust(25)}")
print(f"(Latency: {lat_res[min_time_loc]:.3f}ms; Time: {time_res[min_time_loc]:.2f}s; Speed: {convert_size(spd_res[min_time_loc])}/s)")
print(f"Lowest Jitter: {min_jitter_loc.ljust(25)} (Jitter: {convert_size(max_jitter_res[min_jitter_loc])}; Packet Loss: {packet_loss_res[min_jitter_loc]:.2f}%)")
print(f"Highest Jitter: {max_jitter_loc.ljust(25)} (Jitter: {convert_size(max_jitter_res[max_jitter_loc])}; Packet Loss: {packet_loss_res[max_jitter_loc]:.2f}%)")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment