Skip to content

Instantly share code, notes, and snippets.

@siddontang
Created June 7, 2023 04:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save siddontang/7109c5fc607e165e750ca61ad5f8d476 to your computer and use it in GitHub Desktop.
Save siddontang/7109c5fc607e165e750ca61ad5f8d476 to your computer and use it in GitHub Desktop.
import subprocess
import socket
import time
import psutil
import glob
import requests
import concurrent.futures
def find_available_ports(num_ports):
available_ports = []
for _ in range(num_ports):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('localhost', 0))
_, port = s.getsockname()
available_ports.append(port)
return available_ports
def start_tidb_server(namespace, port1, port2):
return subprocess.Popen(['/Users/tangliu/eng/tidb/bin/tidb-server', '--keyspace-name={}'.format(namespace),
'-P', str(port1), '--status', str(port2),
'-log-slow-query=./var/{}/tidb_slow.log'.format(namespace),
'-log-file=./var/{}/tidb.log'.format(namespace),
'--store=tikv', '--path=127.0.0.1:2379'])
def check_tidb_server(port1):
command = ['mysql', '-h', '127.0.0.1', '-P', str(port1), '-uroot', '-e', 'select 1']
while True:
try:
subprocess.check_output(command)
break
except subprocess.CalledProcessError:
time.sleep(3)
def run_test_case(test):
port1, testcase = test
subprocess.call(['./mysql-tester', '-port', str(port1), testcase])
def stop_tidb_server(pid):
process = psutil.Process(pid)
process.terminate()
def process_test(test):
namespace, testcase = test
ports = find_available_ports(2)
port1, port2 = ports[0], ports[1]
# Create the namespace using curl command
create_namespace_command = f'curl -X POST -H "Content-Type: application/json" -d \'{{"name":"{namespace}"}}\' http://127.0.0.1:2379/pd/api/v2/keyspaces'
subprocess.call(create_namespace_command, shell=True)
print(f"Namespace '{namespace}' created")
tidb_process = start_tidb_server(namespace, port1, port2)
print(f"TiDB server started for test '{namespace}' with ports: {port1}, {port2}")
check_tidb_server(port1)
print(f"TiDB server for test '{namespace}' is running")
run_test_case((port1, testcase))
print(f"Test case for test '{namespace}' finished")
stop_tidb_server(tidb_process.pid)
print(f"TiDB server for test '{namespace}' stopped")
if __name__ == '__main__':
test_files = glob.glob('./t/**/*.test', recursive=True)
tests = [(test_file.replace('./t/', '').replace('/', '_').split('.')[0], (test_file.replace('./t/', '').split('.')[0])) for test_file in test_files]
print(tests)
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
executor.map(process_test, tests)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment