-
-
Save malwaremily/61eb848cdd0e05a6895b153727abc6e6 to your computer and use it in GitHub Desktop.
best closest servers speedtest
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def closestServers(client, all=False): | |
"""Determine the 5 closest speedtest.net servers based on geographic | |
distance | |
""" | |
urls = [ | |
'://www.speedtest.net/speedtest-servers-static.php', | |
'://c.speedtest.net/speedtest-servers-static.php', | |
'://www.speedtest.net/speedtest-servers.php', | |
'://c.speedtest.net/speedtest-servers.php', | |
] | |
errors = [] | |
servers = {} | |
for url in urls: | |
try: | |
request = build_request(url) | |
uh, e = catch_request(request) | |
if e: | |
errors.append('%s' % e) | |
raise SpeedtestCliServerListError | |
serversxml = [] | |
while 1: | |
serversxml.append(uh.read(10240)) | |
if len(serversxml[-1]) == 0: | |
break | |
if int(uh.code) != 200: | |
uh.close() | |
raise SpeedtestCliServerListError | |
uh.close() | |
try: | |
try: | |
root = ET.fromstring(''.encode().join(serversxml)) | |
elements = root.getiterator('server') | |
except AttributeError: # Python3 branch | |
root = DOM.parseString(''.join(serversxml)) | |
elements = root.getElementsByTagName('server') | |
except SyntaxError: | |
raise SpeedtestCliServerListError | |
for server in elements: | |
try: | |
attrib = server.attrib | |
except AttributeError: | |
attrib = dict(list(server.attributes.items())) | |
d = distance([float(client['lat']), | |
float(client['lon'])], | |
[float(attrib.get('lat')), | |
float(attrib.get('lon'))]) | |
attrib['d'] = d | |
if d not in servers: | |
servers[d] = [attrib] | |
else: | |
servers[d].append(attrib) | |
del root | |
del serversxml | |
del elements | |
except SpeedtestCliServerListError: | |
continue | |
# We were able to fetch and parse the list of speedtest.net servers | |
if servers: | |
break | |
if not servers: | |
print_('Failed to retrieve list of speedtest.net servers:\n\n %s' % | |
'\n'.join(errors)) | |
sys.exit(1) | |
closest = [] | |
for d in sorted(servers.keys()): | |
for s in servers[d]: | |
closest.append(s) | |
if len(closest) == 5 and not all: | |
break | |
else: | |
continue | |
break | |
del servers | |
return closest | |
def getBestServer(servers): | |
"""Perform a speedtest.net latency request to determine which | |
speedtest.net server has the lowest latency | |
""" | |
results = {} | |
for server in servers: | |
cum = [] | |
url = '%s/latency.txt' % os.path.dirname(server['url']) | |
urlparts = urlparse(url) | |
for i in range(0, 3): | |
try: | |
if urlparts[0] == 'https': | |
h = HTTPSConnection(urlparts[1]) | |
else: | |
h = HTTPConnection(urlparts[1]) | |
headers = {'User-Agent': user_agent} | |
start = timeit.default_timer() | |
h.request("GET", urlparts[2], headers=headers) | |
r = h.getresponse() | |
total = (timeit.default_timer() - start) | |
except (HTTPError, URLError, socket.error): | |
cum.append(3600) | |
continue | |
text = r.read(9) | |
if int(r.status) == 200 and text == 'test=test'.encode(): | |
cum.append(total) | |
else: | |
cum.append(3600) | |
h.close() | |
avg = round((sum(cum) / 6) * 1000, 3) | |
results[avg] = server | |
fastest = sorted(results.keys())[0] | |
best = results[fastest] | |
best['latency'] = fastest | |
return best |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment