Last active
November 23, 2021 09:06
-
-
Save travislee89/5bb76d329ba616a65d59a0e8fe9f0f67 to your computer and use it in GitHub Desktop.
Test the connectivity of your ladder.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import yaml | |
import requests | |
import os | |
import sys | |
import logging | |
import time | |
import argparse | |
import signal | |
import threading | |
from subprocess import Popen, PIPE | |
default_url = 'https://ifconfig.me/ip' | |
default_down_url = 'https://dl.google.com/chrome/mac/universal/stable/GGRO/googlechrome.dmg' | |
default_clash_bin = 'clash' | |
downloaded = 0 | |
class Config(object): | |
def __init__(self, port=57890, clash_bin=default_clash_bin, url=default_url, down_url=default_down_url): | |
self.config = {} | |
self.config_file = '/tmp/ladder_test.yaml' | |
self.clash_bin = clash_bin | |
self.port = port | |
self.init_config(self.port) | |
self.proxies = {'http': 'http://127.0.0.1:%d' % self.port, 'https': 'http://127.0.0.1:%d' % self.port} | |
self.process = None | |
self.timeout = 10 | |
self.clash_stdout = '' | |
self.clash_stderr = '' | |
self.session = self._new_session() | |
self.url = url | |
self.down_url = down_url | |
def init_config(self, port: int): | |
self.config = { | |
'port': port, | |
'socks-port': port + 1, | |
'external-controller': '127.0.0.1:%d' % (port + 2), | |
'allow-lan': False, | |
'log-level': 'info', | |
'mode': 'Rule', | |
'proxies': [], | |
'proxy-groups': [{'name': 'Default', | |
'type': 'load-balance', | |
'proxies': [], | |
'url': 'https://www.gstatic.com/generate_204', | |
'interval': 60, | |
}], | |
'rules': ['DOMAIN-SUFFIX,.,Default', 'MATCH,Default'] | |
} | |
def _new_session(self): | |
s = requests.Session() | |
s.proxies = self.proxies | |
s.adapters.DEFAULT_RETRIES = 3 | |
return s | |
def change_node(self, node: dict, name: str): | |
self.config['proxies'].clear() | |
self.config['proxy-groups'][0]['proxies'].clear() | |
self.config['proxies'].append(node) | |
self.config['proxy-groups'][0]['proxies'].append(name) | |
def test_config(self) -> bool: | |
logger.info('Testing node config') | |
test_command = [self.clash_bin, '-t', '-f', self.config_file] | |
test_process = Popen(test_command, stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True) | |
test_process.wait() | |
result_test = test_process.returncode | |
if result_test != 0: | |
logger.error('config INVALID, SKIP test this node') | |
return False | |
logger.info('config file valid') | |
return True | |
def target(self): | |
logger.info('Thread started') | |
test_command = [self.clash_bin, '-f', self.config_file] | |
self.process = Popen(test_command, stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True) | |
self.clash_stdout, self.clash_stderr = self.process.communicate() | |
logger.info('Thread finished') | |
def test(self): | |
logger.info('Testing node') | |
thread = threading.Thread(target=self.target) | |
thread.start() | |
time.sleep(5) | |
try: | |
r = self.session.get(self.url, timeout=10) | |
except requests.exceptions.Timeout: | |
logger.error('Timeout') | |
except requests.exceptions.ConnectionError: | |
logger.error('Connection error') | |
except requests.exceptions.RequestException as e: | |
logger.error(e) | |
except Exception as e: | |
logger.error(e) | |
else: | |
logger.info('Response Status Code: %s' % r.status_code) | |
logger.info('Response Body: %s' % r.text[:100]) | |
finally: | |
self.download_file() | |
logger.info('Terminating process') | |
self.process.send_signal(signal.SIGINT) | |
if self.clash_stdout: | |
logger.info(self.clash_stdout) | |
if self.clash_stderr: | |
logger.error(self.clash_stderr) | |
logger.info('------test finish------') | |
# thread.join(self.timeout) | |
# if thread.is_alive(): | |
# logger.info('Terminating process') | |
# self.process.send_signal(signal.SIGINT) | |
# logger.info(self.process.returncode) | |
# thread.join() | |
def download_file(self): | |
logger.info('Testing download speed') | |
logger.info('URL: %s' % self.down_url) | |
with open('/dev/null', 'wb') as f: | |
start_time = time.time() | |
r = self.session.get(self.down_url, stream=True) | |
total_length = r.headers.get('content-length') | |
dl = 0 | |
if total_length is None: # no content length header | |
f.write(r.content) | |
else: | |
global downloaded | |
total = int(total_length) | |
total_mb = int(total / 1024**2) | |
logger.info('Downloading test file of size %s MB...' % total_mb) | |
for data in r.iter_content(chunk_size=max(int(total / 1000), 1024**2)): | |
downloaded += len(data) | |
downloaded_mb = int((downloaded / 1024**2)) | |
f.write(data) | |
done = int(50 * downloaded / total) | |
sys.stdout.write('\r[{}{}] {}% {}/{} MB'.format('█' * done, '.' * (50 - done), | |
round(downloaded/total * 100, 2), | |
downloaded_mb, total_mb)) | |
sys.stdout.write('\n') | |
logger.info('Downloaded %s Bytes(%s MB)' % (downloaded, int(downloaded/1024**2))) | |
sys.stdout.flush() | |
download_time = time.time() - start_time | |
speed = int(downloaded / download_time / 1024 ** 2) | |
logger.info('Download time: %s' % time.strftime('%H:%M:%S', time.gmtime(download_time))) | |
if downloaded == total: | |
logger.info('Testing download speed finished') | |
else: | |
logger.info('Testing download speed uncompleted') | |
logger.info('Download speed: %s MB/s' % speed) | |
def run(self): | |
with open(self.config_file, 'w', encoding='utf8') as config_file: | |
yaml.dump(self.config, config_file) | |
self.node_info() | |
config_invalid = self.test_config() | |
if config_invalid is False: | |
return False | |
self.test() | |
self.clean_temp_file() | |
def node_info(self): | |
if isinstance(self.config['proxies'], list) and len(self.config['proxies']) == 1: | |
_proxy = self.config['proxies'][0] | |
if isinstance(_proxy, dict): | |
logger.info('node name: %s; type: %s; server: %s' | |
% (_proxy.get('name'), _proxy.get('type'), _proxy.get('server'))) | |
def clean_temp_file(self): | |
if os.path.exists(self.config_file): | |
os.remove(self.config_file) | |
def load_clash(_clash_config_file: str): | |
with open(_clash_config_file, 'r', encoding='utf8') as f: | |
try: | |
_clash_config = yaml.safe_load(f) | |
except yaml.YAMLError as exc: | |
print(exc) | |
sys.exit(1) | |
else: | |
return _clash_config | |
def get_logger(name=__name__, level=None): | |
fmt = '[%(asctime)s][%(levelname)s] %(message)s' | |
logging.basicConfig(format=fmt) | |
_logger = logging.getLogger(name) | |
if level is None: | |
level = logging.INFO | |
_logger.setLevel(level) | |
if len(_logger.handlers) == 0: | |
handler = logging.StreamHandler(stream=sys.stdout) | |
formatter = logging.Formatter(fmt) | |
handler.setFormatter(formatter) | |
_logger.addHandler(handler) | |
_logger.propagate = False | |
return _logger | |
if __name__ == '__main__': | |
logger = get_logger() | |
parser = argparse.ArgumentParser(description='test your ladder') | |
parser.add_argument('-c', '--clash-conf', type=str, default=None) | |
parser.add_argument('-b', '--clash-bin', type=str, default=default_clash_bin) | |
parser.add_argument('-u', '--url', type=str, default=default_url) | |
parser.add_argument('-d', '--down-url', type=str, default=default_down_url) | |
param = parser.parse_args() | |
config = Config(clash_bin=param.clash_bin, url=param.url, down_url=param.down_url) | |
logger.info('Loading config file: %s' % param.clash_conf) | |
clash_config = load_clash(param.clash_conf) | |
if isinstance(clash_config, dict): | |
node_number = len(clash_config['proxies']) | |
logger.info('clash node number: %d' % node_number) | |
for proxy in clash_config['proxies']: | |
config.change_node(node=proxy, name=proxy['name']) | |
config.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment