Skip to content

Instantly share code, notes, and snippets.

@12101111
Forked from lilydjwg/gh-check
Created April 16, 2020 06:22
Show Gist options
  • Save 12101111/7c4f82e30d37d1fdbe3dd4d5b2479e6c to your computer and use it in GitHub Desktop.
Save 12101111/7c4f82e30d37d1fdbe3dd4d5b2479e6c to your computer and use it in GitHub Desktop.
gh-check: speed test to known GitHub IPs
#!/usr/bin/python3
import asyncio
import time
import socket
import argparse
import aiohttp
class MyConnector(aiohttp.TCPConnector):
def __init__(self, ip):
self.__ip = ip
super().__init__()
async def _resolve_host(
self, host: str, port: int,
traces: None = None,
):
return [{
'hostname': host, 'host': self.__ip, 'port': port,
'family': self._family, 'proto': 0, 'flags': 0,
}]
async def test_domain(domain, ip, proto):
if proto == 'http':
return await test_domain_http(domain, ip)
elif proto == 'ssh':
return await test_domain_ssh(domain, ip)
else:
raise ValueError('unknown proto', proto)
async def test_domain_ssh(domain, ip):
st = time.time()
r, _w = await asyncio.open_connection(ip, 22)
await r.read(1)
return time.time() - st
async def test_domain_http(domain, ip):
url = 'https://github.com/'
st = time.time()
async with aiohttp.ClientSession(
connector = MyConnector(ip),
timeout = aiohttp.ClientTimeout(total=10),
) as s:
r = await s.get(url)
_ = await r.text()
return time.time() - st
async def producer(q, proto):
items = await get_items(proto)
for item in items:
await q.put(item)
await q.put(None)
async def printer(q):
while True:
try:
item = await q.get()
except asyncio.CancelledError:
break
if isinstance(item[1], Exception):
(domain, ip, proto), e = item
print(f'{domain:21} {ip:15} {proto:4} {e!r}')
else:
(domain, ip, proto), t = item
print(f'{domain:21} {ip:15} {proto:4} {t:6.2f}')
async def fastest_finder(q):
fastest_ip, latency = None, 1000
while True:
try:
item = await q.get()
except asyncio.CancelledError:
return fastest_ip
if not isinstance(item[1], Exception):
(_, ip, _), t = item
if t < latency:
latency = t
fastest_ip = ip
async def worker(q, ret_q):
while True:
item = await q.get()
if item is None:
await q.put(None)
break
try:
t = await test_domain(*item)
except Exception as e:
await ret_q.put((item, e))
else:
await ret_q.put((item, t))
async def main(proto):
q = asyncio.Queue()
ret_q = asyncio.Queue()
futures = [worker(q, ret_q) for _ in range(40)]
producer_fu = asyncio.ensure_future(producer(q, proto))
printer_fu = asyncio.ensure_future(printer(ret_q))
await asyncio.wait(futures)
printer_fu.cancel()
await producer_fu
await printer_fu
async def update_hosts():
import os, sys, subprocess
if os.geteuid() != 0:
sys.exit('not root?')
q = asyncio.Queue()
ret_q = asyncio.Queue()
futures = [worker(q, ret_q) for _ in range(40)]
producer_fu = asyncio.ensure_future(
producer(q, ['http']))
finder_fu = asyncio.ensure_future(
fastest_finder(ret_q))
await asyncio.wait(futures)
finder_fu.cancel()
await producer_fu
ip = await finder_fu
if ip is not None:
cmd = ['sed', '-i', rf'/^[0-9.]\+[[:space:]]\+github\.com\>/s/[^[:space:]]\+/{ip}/', '/etc/hosts']
subprocess.check_call(cmd)
async def resolve(domain):
loop = asyncio.get_event_loop()
addrinfo = await loop.getaddrinfo(
domain, None,
family=socket.AF_INET,
proto=socket.IPPROTO_TCP,
)
ips = [x[-1][0] for x in addrinfo]
return domain, ips
async def get_items(proto):
domains = [
"ams-region.github.com",
"sea-region.github.com",
"iad-region.github.com",
"sin-region.github.com",
]
print('Resolving domains...', flush=True, end='')
futures = [resolve(domain) for domain in domains]
futures, _ = await asyncio.wait(futures)
results = [fu.result() for fu in futures]
items = []
for domain, ips in results:
for ip in ips:
items.append((domain, ip))
print('done.')
items += [
('Sydney', '13.237.44.5'),
('Sydney', '52.64.108.95'),
('Sydney', '13.236.229.21'),
('Sao Paulo', '18.231.5.6'),
('Sao Paulo', '18.228.52.138'),
('Sao Paulo', '18.228.67.229'),
('Seoul', '15.164.81.167'),
('Seoul', '52.78.231.108'),
('Seattle', '52.192.72.89'),
('Tokyo', '13.114.40.48'),
('Tokyo', '52.69.186.44'),
('Bombay', '13.234.210.38'),
('Bombay', '13.234.176.102'),
]
return [(x[0], x[1], y) for x in items for y in proto]
if __name__ == '__main__':
import logging
logging.getLogger().addHandler(logging.NullHandler())
parser = argparse.ArgumentParser(
description='GitHub IP 访问速度测试')
parser.add_argument('proto', nargs='*',
default=['http', 'ssh'],
help='测试指定协议')
parser.add_argument('--hosts',
action='store_true',
help='更新 /etc/hosts')
args = parser.parse_args()
if args.hosts:
main_fu = update_hosts()
else:
main_fu = main(args.proto)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main_fu)
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment