Skip to content

Instantly share code, notes, and snippets.

@maqp
Last active January 14, 2022 03:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maqp/0e5dcf542ebb97baf98d198115e931ea to your computer and use it in GitHub Desktop.
Save maqp/0e5dcf542ebb97baf98d198115e931ea to your computer and use it in GitHub Desktop.
TorBugTest.py
#!/usr/bin/env python3.9
# -*- coding: utf-8 -*-
import base64
import hashlib
import logging
import os
import random
import shlex
import socket
import sys
import tempfile
import time
from datetime import datetime
from multiprocessing import Process, Queue
from typing import Any, Optional, Union
import requests
import stem.process
from flask import Flask
from stem.control import Controller
CLIENT_QUEUE = 'client_queue'
EXPECTED_SERVER_RESPONSE = 'Hello'
SERVER_CONNECTION_DELAY = 1
# Testing
test_run = False # Set True to test util properly restarts the client when server's been down long enough.
DOWNTIME_BEFORE_NEW_ONION_SERVICE = 10 if test_run else 3600
CONNECTIONS_BEFORE_BREAK = 5
# Logging
start_timestamp = datetime.now().strftime('%d-%m-%y - %H-%M-%S')
log_file_name = f'TorBug Log - {start_timestamp}.txt'
# Onion address format
ONION_ADDRESS_CHECKSUM_ID = b'.onion checksum'
ONION_SERVICE_VERSION = b'\x03'
ONION_SERVICE_VERSION_LENGTH = 1
ONION_ADDRESS_CHECKSUM_LENGTH = 2
ONION_ADDRESS_LENGTH = 56
# VT100 codes
CURSOR_UP_ONE_LINE = '\x1b[1A'
CLEAR_ENTIRE_LINE = '\x1b[2K'
###############################################################################
# Utils #
###############################################################################
def get_timestamp() -> str:
return datetime.now().strftime('%d-%m-%y %H:%M:%S')
def seconds_to_timestamp(seconds: float) -> str:
"""Convert total seconds (e.g. 105.22) to timestamp (00:01:45.22)."""
hours, remainder = divmod(seconds, 60 * 60)
minutes, seconds = divmod(remainder, 60)
try:
fractions = str(seconds).split('.')[1]
while len(fractions) > 6:
fractions = fractions[:-1]
except IndexError:
fractions = '0'
while len(fractions) < 6:
fractions += '0'
return f"{int(hours):02d}h {int(minutes):02d}m {int(seconds):02d}.{factions[:1]}s"
def write_to_file(file_name: str, data: str) -> None:
"""Write data to file."""
with open(file_name, 'a+') as f:
f.write(data + '\n')
f.flush()
os.fsync(f.fileno())
###############################################################################
# Tor #
###############################################################################
def get_available_port(min_port: int, max_port: int) -> int:
"""Find a random available port within the given range."""
sys_rand = random.SystemRandom()
with socket.socket() as temp_sock:
while True:
try:
temp_sock.bind(('127.0.0.1', sys_rand.randint(min_port, max_port)))
break
except OSError:
pass
_, port = temp_sock.getsockname() # type: str, int
return port
def stem_compatible_ed25519_key_from_private_key(private_key: bytes) -> str:
"""Tor's custom encoding format for v3 Onion Service private keys.
This code is based on Tor's testing code at
https://github.com/torproject/tor/blob/8e84968ffbf6d284e8a877ddcde6ded40b3f5681/src/test/ed25519_exts_ref.py#L48
"""
b = 256
def bit(h: bytes, i: int) -> int:
return (h[i // 8] >> (i % 8)) & 1
def encode_int(y: int) -> bytes:
bits = [(y >> i) & 1 for i in range(b)]
return b''.join([bytes([(sum([bits[i * 8 + j] << j for j in range(8)]))]) for i in range(b // 8)])
def expand_private_key(sk: bytes) -> bytes:
h = hashlib.sha512(sk).digest()
a = 2 ** (b - 2) + sum(2 ** i * bit(h, i) for i in range(3, b - 2))
k = b''.join([bytes([h[i]]) for i in range(b // 8, b // 4)])
return encode_int(a) + k
expanded_private_key = expand_private_key(private_key)
return base64.b64encode(expanded_private_key).decode()
class Tor(object):
"""Tor class manages the starting and stopping of Tor client."""
def __init__(self) -> None:
self.tor_process = None # type: Optional[Any]
self.controller = None # type: Optional[Controller]
def connect(self, port: int) -> None:
"""Launch Tor as a subprocess."""
tor_data_directory = tempfile.TemporaryDirectory()
tor_control_socket = os.path.join(tor_data_directory.name, 'control_socket')
if not os.path.isfile('/usr/bin/tor'):
raise SystemExit("Check that Tor is installed.")
print(f"{get_timestamp()} - Server: Launching Tor...")
self.launch_tor_process(port, tor_control_socket, tor_data_directory)
start_ts = time.monotonic()
self.controller = stem.control.Controller.from_socket_file(path=tor_control_socket)
self.controller.authenticate()
while True:
time.sleep(0.1)
try:
response = self.controller.get_info("status/bootstrap-phase")
except stem.SocketClosed:
raise SystemExit("Tor socket closed.")
res_parts = shlex.split(response)
summary = res_parts[4].split('=')[1]
if summary == 'Done':
print(f"{get_timestamp()} - Server: Tor successfully started")
break
if time.monotonic() - start_ts > 15:
start_ts = time.monotonic()
self.controller = stem.control.Controller.from_socket_file(path=tor_control_socket)
self.controller.authenticate()
def launch_tor_process(self,
port: int,
tor_control_socket: Union[bytes, str],
tor_data_directory: Any
) -> None:
"""Launch Tor process."""
while True:
try:
self.tor_process = stem.process.launch_tor_with_config(
config={"DataDirectory": tor_data_directory.name,
"SocksPort": str(port),
"ControlSocket": tor_control_socket,
"AvoidDiskWrites": "1",
"Log": "notice stdout",
"GeoIPFile": "/usr/share/tor/geoip",
"GeoIPv6File ": "/usr/share/tor/geoip6"},
tor_cmd="/usr/bin/tor")
break
except OSError:
pass # Tor timed out. Try again.
def stop(self) -> None:
"""Stop the Tor subprocess."""
if self.tor_process is not None:
self.tor_process.terminate()
time.sleep(0.1)
if not self.tor_process.poll():
self.tor_process.kill()
def monitor_queues(tor: Tor,
response: Any,
) -> None:
"""Monitor queues for incoming packets."""
while True:
try:
time.sleep(0.1)
except (EOFError, KeyboardInterrupt):
pass
except stem.SocketClosed:
if tor.controller is not None:
tor.controller.remove_hidden_service(response.service_id)
tor.stop()
break
def onion_service(client_queue: 'Queue', flask_port: int) -> None:
"""Manage the Tor Onion Service and control Tor via stem."""
try:
tor_port = get_available_port(1000, 65535)
tor = Tor()
tor.connect(tor_port)
except (EOFError, KeyboardInterrupt):
return
if tor.controller is None:
raise SystemExit("No Tor controller")
try:
private_key = os.getrandom(32, flags=0)
key_blob = stem_compatible_ed25519_key_from_private_key(private_key)
response = tor.controller.create_ephemeral_hidden_service(ports={80: flask_port},
key_type='ED25519-V3',
key_content=key_blob,
await_publication=True)
print(f"{get_timestamp()} - Server: Onion Service running at http://{response.service_id}.onion\n")
# Inform client of the URL at this point
client_queue.put((tor_port, response.service_id))
except (KeyboardInterrupt, stem.SocketClosed):
tor.stop()
return
monitor_queues(tor, response)
###############################################################################
# Server #
###############################################################################
def server(flask_port: int) -> None:
"""Minimal server code."""
app = Flask(__name__)
@app.route('/')
def index() -> str:
return EXPECTED_SERVER_RESPONSE
log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR)
app.run(port=flask_port)
###############################################################################
# Client #
###############################################################################
def client(client_queue: 'Queue', restart_queue: 'Queue') -> None:
"""Run tiny client to reproduce the connection issue.
The client first waits for the Onion Service to be up, and then connects to it.
When the server goes down the client starts warning about the issue.
"""
while client_queue == 0:
time.sleep(0.1)
tor_port, onion_address = client_queue.get()
server_start_time_s = time.monotonic()
server_start_datetime = datetime.now()
server_last_seen_dt = None
server_down_timestamp = None
server_has_been_seen = False
i = 0
while True:
try:
time.sleep(SERVER_CONNECTION_DELAY)
i += 1
try:
session = requests.session()
session.proxies = {'http': f'socks5h://127.0.0.1:{tor_port}',
'https': f'socks5h://127.0.0.1:{tor_port}'}
print(f"{CURSOR_UP_ONE_LINE}{CLEAR_ENTIRE_LINE}{get_timestamp()}"
f" - Client: Connecting to {onion_address}.onion...")
if test_run and i > CONNECTIONS_BEFORE_BREAK:
# Simulate connection break by connecting to non-existing vanity address
onion_address = len(onion_address) * 'a'
response = session.get(f"http://{onion_address}.onion/").text
if response == EXPECTED_SERVER_RESPONSE:
uptime_s = time.monotonic() - server_start_time_s
print(f"{CURSOR_UP_ONE_LINE}{CLEAR_ENTIRE_LINE}{get_timestamp()}"
f" - Client: Server has been UP for {seconds_to_timestamp(uptime_s)}")
server_has_been_seen = True
server_last_seen_dt = datetime.now()
server_down_timestamp = None
except requests.exceptions.RequestException:
if server_down_timestamp is None:
print()
if server_has_been_seen:
server_down_timestamp = time.monotonic() if server_down_timestamp is None else server_down_timestamp
downtime_s = time.monotonic() - server_down_timestamp
print(f"{CURSOR_UP_ONE_LINE}{CLEAR_ENTIRE_LINE}{get_timestamp()}"
f" - Client: Server has been DOWN for {seconds_to_timestamp(downtime_s)}")
if downtime_s >= DOWNTIME_BEFORE_NEW_ONION_SERVICE:
restart_queue.put((server_start_datetime,
server_last_seen_dt,
downtime_s))
except (EOFError, KeyboardInterrupt):
pass
###############################################################################
# Main #
###############################################################################
def monitor_processes(process_list: list[Process], restart_queue: Queue) -> None:
"""Monitor the running processes."""
while True:
try:
time.sleep(0.1)
if restart_queue.qsize() != 0:
server_start_datetime, server_last_seen_dt, downtime_s = restart_queue.get() # type: datetime, datetime, float
log_msg = (f"\n{get_timestamp()} - Main: Client reports"
f"\n Server first seen : {server_start_datetime.strftime('%d-%m-%y %H:%M:%S')}"
f"\n Server last seen : {server_last_seen_dt.strftime('%d-%m-%y %H:%M:%S')}"
f"\n Server's been down for : {seconds_to_timestamp(downtime_s)}"
f"\n Restarting with new Onion Service")
print(f"\n{log_msg}\n")
write_to_file(log_file_name, log_msg)
for p in process_list:
p.terminate()
return
if not all([p.is_alive() for p in process_list]):
for p in process_list:
p.terminate()
sys.exit(1)
except (EOFError, KeyboardInterrupt):
pass
def main():
write_to_file(log_file_name, f'{get_timestamp()} - Main: Starting bug testing.')
print('')
while True:
flask_port = get_available_port(5000, 6000)
client_queue = Queue()
restart_queue = Queue()
process_list = [
Process(target=onion_service, args=(client_queue, flask_port )),
Process(target=server, args=(flask_port, )),
Process(target=client, args=(client_queue, restart_queue)),
]
for p in process_list:
p.start()
monitor_processes(process_list, restart_queue)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment