Skip to content

Instantly share code, notes, and snippets.

@malwaremily
Created February 27, 2021 05:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save malwaremily/61eb848cdd0e05a6895b153727abc6e6 to your computer and use it in GitHub Desktop.
Save malwaremily/61eb848cdd0e05a6895b153727abc6e6 to your computer and use it in GitHub Desktop.
best closest servers speedtest
def closestServers(client, all=False):
"""Determine the 5 closest speedtest.net servers based on geographic
distance
"""
urls = [
'://www.speedtest.net/speedtest-servers-static.php',
'://c.speedtest.net/speedtest-servers-static.php',
'://www.speedtest.net/speedtest-servers.php',
'://c.speedtest.net/speedtest-servers.php',
]
errors = []
servers = {}
for url in urls:
try:
request = build_request(url)
uh, e = catch_request(request)
if e:
errors.append('%s' % e)
raise SpeedtestCliServerListError
serversxml = []
while 1:
serversxml.append(uh.read(10240))
if len(serversxml[-1]) == 0:
break
if int(uh.code) != 200:
uh.close()
raise SpeedtestCliServerListError
uh.close()
try:
try:
root = ET.fromstring(''.encode().join(serversxml))
elements = root.getiterator('server')
except AttributeError: # Python3 branch
root = DOM.parseString(''.join(serversxml))
elements = root.getElementsByTagName('server')
except SyntaxError:
raise SpeedtestCliServerListError
for server in elements:
try:
attrib = server.attrib
except AttributeError:
attrib = dict(list(server.attributes.items()))
d = distance([float(client['lat']),
float(client['lon'])],
[float(attrib.get('lat')),
float(attrib.get('lon'))])
attrib['d'] = d
if d not in servers:
servers[d] = [attrib]
else:
servers[d].append(attrib)
del root
del serversxml
del elements
except SpeedtestCliServerListError:
continue
# We were able to fetch and parse the list of speedtest.net servers
if servers:
break
if not servers:
print_('Failed to retrieve list of speedtest.net servers:\n\n %s' %
'\n'.join(errors))
sys.exit(1)
closest = []
for d in sorted(servers.keys()):
for s in servers[d]:
closest.append(s)
if len(closest) == 5 and not all:
break
else:
continue
break
del servers
return closest
def getBestServer(servers):
"""Perform a speedtest.net latency request to determine which
speedtest.net server has the lowest latency
"""
results = {}
for server in servers:
cum = []
url = '%s/latency.txt' % os.path.dirname(server['url'])
urlparts = urlparse(url)
for i in range(0, 3):
try:
if urlparts[0] == 'https':
h = HTTPSConnection(urlparts[1])
else:
h = HTTPConnection(urlparts[1])
headers = {'User-Agent': user_agent}
start = timeit.default_timer()
h.request("GET", urlparts[2], headers=headers)
r = h.getresponse()
total = (timeit.default_timer() - start)
except (HTTPError, URLError, socket.error):
cum.append(3600)
continue
text = r.read(9)
if int(r.status) == 200 and text == 'test=test'.encode():
cum.append(total)
else:
cum.append(3600)
h.close()
avg = round((sum(cum) / 6) * 1000, 3)
results[avg] = server
fastest = sorted(results.keys())[0]
best = results[fastest]
best['latency'] = fastest
return best
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment