Skip to content

Instantly share code, notes, and snippets.

@travislee89
Last active November 23, 2021 09:06
Show Gist options
  • Save travislee89/5bb76d329ba616a65d59a0e8fe9f0f67 to your computer and use it in GitHub Desktop.
Save travislee89/5bb76d329ba616a65d59a0e8fe9f0f67 to your computer and use it in GitHub Desktop.
Test the connectivity of your ladder.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import yaml
import requests
import os
import sys
import logging
import time
import argparse
import signal
import threading
from subprocess import Popen, PIPE
default_url = 'https://ifconfig.me/ip'
default_down_url = 'https://dl.google.com/chrome/mac/universal/stable/GGRO/googlechrome.dmg'
default_clash_bin = 'clash'
downloaded = 0
class Config(object):
def __init__(self, port=57890, clash_bin=default_clash_bin, url=default_url, down_url=default_down_url):
self.config = {}
self.config_file = '/tmp/ladder_test.yaml'
self.clash_bin = clash_bin
self.port = port
self.init_config(self.port)
self.proxies = {'http': 'http://127.0.0.1:%d' % self.port, 'https': 'http://127.0.0.1:%d' % self.port}
self.process = None
self.timeout = 10
self.clash_stdout = ''
self.clash_stderr = ''
self.session = self._new_session()
self.url = url
self.down_url = down_url
def init_config(self, port: int):
self.config = {
'port': port,
'socks-port': port + 1,
'external-controller': '127.0.0.1:%d' % (port + 2),
'allow-lan': False,
'log-level': 'info',
'mode': 'Rule',
'proxies': [],
'proxy-groups': [{'name': 'Default',
'type': 'load-balance',
'proxies': [],
'url': 'https://www.gstatic.com/generate_204',
'interval': 60,
}],
'rules': ['DOMAIN-SUFFIX,.,Default', 'MATCH,Default']
}
def _new_session(self):
s = requests.Session()
s.proxies = self.proxies
s.adapters.DEFAULT_RETRIES = 3
return s
def change_node(self, node: dict, name: str):
self.config['proxies'].clear()
self.config['proxy-groups'][0]['proxies'].clear()
self.config['proxies'].append(node)
self.config['proxy-groups'][0]['proxies'].append(name)
def test_config(self) -> bool:
logger.info('Testing node config')
test_command = [self.clash_bin, '-t', '-f', self.config_file]
test_process = Popen(test_command, stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True)
test_process.wait()
result_test = test_process.returncode
if result_test != 0:
logger.error('config INVALID, SKIP test this node')
return False
logger.info('config file valid')
return True
def target(self):
logger.info('Thread started')
test_command = [self.clash_bin, '-f', self.config_file]
self.process = Popen(test_command, stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True)
self.clash_stdout, self.clash_stderr = self.process.communicate()
logger.info('Thread finished')
def test(self):
logger.info('Testing node')
thread = threading.Thread(target=self.target)
thread.start()
time.sleep(5)
try:
r = self.session.get(self.url, timeout=10)
except requests.exceptions.Timeout:
logger.error('Timeout')
except requests.exceptions.ConnectionError:
logger.error('Connection error')
except requests.exceptions.RequestException as e:
logger.error(e)
except Exception as e:
logger.error(e)
else:
logger.info('Response Status Code: %s' % r.status_code)
logger.info('Response Body: %s' % r.text[:100])
finally:
self.download_file()
logger.info('Terminating process')
self.process.send_signal(signal.SIGINT)
if self.clash_stdout:
logger.info(self.clash_stdout)
if self.clash_stderr:
logger.error(self.clash_stderr)
logger.info('------test finish------')
# thread.join(self.timeout)
# if thread.is_alive():
# logger.info('Terminating process')
# self.process.send_signal(signal.SIGINT)
# logger.info(self.process.returncode)
# thread.join()
def download_file(self):
logger.info('Testing download speed')
logger.info('URL: %s' % self.down_url)
with open('/dev/null', 'wb') as f:
start_time = time.time()
r = self.session.get(self.down_url, stream=True)
total_length = r.headers.get('content-length')
dl = 0
if total_length is None: # no content length header
f.write(r.content)
else:
global downloaded
total = int(total_length)
total_mb = int(total / 1024**2)
logger.info('Downloading test file of size %s MB...' % total_mb)
for data in r.iter_content(chunk_size=max(int(total / 1000), 1024**2)):
downloaded += len(data)
downloaded_mb = int((downloaded / 1024**2))
f.write(data)
done = int(50 * downloaded / total)
sys.stdout.write('\r[{}{}] {}% {}/{} MB'.format('█' * done, '.' * (50 - done),
round(downloaded/total * 100, 2),
downloaded_mb, total_mb))
sys.stdout.write('\n')
logger.info('Downloaded %s Bytes(%s MB)' % (downloaded, int(downloaded/1024**2)))
sys.stdout.flush()
download_time = time.time() - start_time
speed = int(downloaded / download_time / 1024 ** 2)
logger.info('Download time: %s' % time.strftime('%H:%M:%S', time.gmtime(download_time)))
if downloaded == total:
logger.info('Testing download speed finished')
else:
logger.info('Testing download speed uncompleted')
logger.info('Download speed: %s MB/s' % speed)
def run(self):
with open(self.config_file, 'w', encoding='utf8') as config_file:
yaml.dump(self.config, config_file)
self.node_info()
config_invalid = self.test_config()
if config_invalid is False:
return False
self.test()
self.clean_temp_file()
def node_info(self):
if isinstance(self.config['proxies'], list) and len(self.config['proxies']) == 1:
_proxy = self.config['proxies'][0]
if isinstance(_proxy, dict):
logger.info('node name: %s; type: %s; server: %s'
% (_proxy.get('name'), _proxy.get('type'), _proxy.get('server')))
def clean_temp_file(self):
if os.path.exists(self.config_file):
os.remove(self.config_file)
def load_clash(_clash_config_file: str):
with open(_clash_config_file, 'r', encoding='utf8') as f:
try:
_clash_config = yaml.safe_load(f)
except yaml.YAMLError as exc:
print(exc)
sys.exit(1)
else:
return _clash_config
def get_logger(name=__name__, level=None):
fmt = '[%(asctime)s][%(levelname)s] %(message)s'
logging.basicConfig(format=fmt)
_logger = logging.getLogger(name)
if level is None:
level = logging.INFO
_logger.setLevel(level)
if len(_logger.handlers) == 0:
handler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter(fmt)
handler.setFormatter(formatter)
_logger.addHandler(handler)
_logger.propagate = False
return _logger
if __name__ == '__main__':
logger = get_logger()
parser = argparse.ArgumentParser(description='test your ladder')
parser.add_argument('-c', '--clash-conf', type=str, default=None)
parser.add_argument('-b', '--clash-bin', type=str, default=default_clash_bin)
parser.add_argument('-u', '--url', type=str, default=default_url)
parser.add_argument('-d', '--down-url', type=str, default=default_down_url)
param = parser.parse_args()
config = Config(clash_bin=param.clash_bin, url=param.url, down_url=param.down_url)
logger.info('Loading config file: %s' % param.clash_conf)
clash_config = load_clash(param.clash_conf)
if isinstance(clash_config, dict):
node_number = len(clash_config['proxies'])
logger.info('clash node number: %d' % node_number)
for proxy in clash_config['proxies']:
config.change_node(node=proxy, name=proxy['name'])
config.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment