Skip to content

Instantly share code, notes, and snippets.

@maqp
Created December 15, 2018 14:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maqp/50a736ce509e066b5f43523b68d2b703 to your computer and use it in GitHub Desktop.
Save maqp/50a736ce509e066b5f43523b68d2b703 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3.6
# -*- coding: utf-8 -*-
import base64
import hashlib
import os
import random
import shlex
import socket
import subprocess
import tempfile
import time
import stem
from stem.control import Controller
from stem import SocketClosed
from stem import process as stem_process
def get_available_port(min_port: int, max_port: int) -> str:
with socket.socket() as tmpsock:
while True:
try:
tmpsock.bind(('127.0.0.1', random.randint(min_port, max_port)))
break
except OSError:
pass
_, port = tmpsock.getsockname()
return str(port)
class Tor(object):
"""Tor class manages the starting and stopping of Tor client with Stem."""
def __init__(self) -> None:
self.tor_data_directory = None
self.tor_process = None
self.controller = None
def connect(self, port: str) -> bool:
"""Launch Tor as a subprocess."""
self.tor_data_directory = tempfile.TemporaryDirectory()
tor_control_socket = os.path.join(self.tor_data_directory.name, 'control_socket')
try:
self.tor_process = stem_process.launch_tor_with_config(
config={'DataDirectory': self.tor_data_directory.name,
'SocksPort': port,
'ControlSocket': tor_control_socket,
'AvoidDiskWrites': '1',
'Log': 'notice stdout',
'GeoIPFile': '/usr/share/tor/geoip',
'GeoIPv6File ': '/usr/share/tor/geoip6'},
tor_cmd='{}/stemtest/tor/src/app/tor'.format(os.getenv("HOME")))
except OSError:
return False
start_ts = time.monotonic()
self.controller = Controller.from_socket_file(path=tor_control_socket)
self.controller.authenticate()
while True:
time.sleep(0.1)
try:
response = self.controller.get_info("status/bootstrap-phase")
except SocketClosed:
raise SystemExit("Tor socket closed.")
res_parts = shlex.split(response)
summary = res_parts[4].split('=')[1]
if summary == 'Done':
return True
if time.monotonic() - start_ts > 15:
return False
def stop(self) -> None:
"""Stop the Tor subprocess."""
if self.tor_process:
self.tor_process.terminate()
time.sleep(0.1)
if not self.tor_process.poll():
self.tor_process.kill()
def stem_compatible_base64_blob_from_private_key(private_key: bytes) -> str:
"""Create stem compatible key blob from private key.
This code is based on Tor's testing code at
https://github.com/torproject/tor/blob/8e84968ffbf6d284e8a877ddcde6ded40b3f5681/src/test/ed25519_exts_ref.py#L48
"""
b = 256
def bit(h: bytes, i: int) -> int:
return (h[i // 8] >> (i % 8)) & 1
def encode_int(y: int) -> bytes:
bits = [(y >> i) & 1 for i in range(b)]
return b''.join([bytes([(sum([bits[i * 8 + j] << j for j in range(8)]))]) for i in range(b // 8)])
def expand_private_key(sk: bytes) -> bytes:
h = hashlib.sha512(sk).digest()
a = 2 ** (b - 2) + sum(2 ** i * bit(h, i) for i in range(3, b - 2))
k = b''.join([bytes([h[i]]) for i in range(b // 8, b // 4)])
assert len(k) == 32
return encode_int(a) + k
expanded_private_key = expand_private_key(private_key)
return base64.b64encode(expanded_private_key).decode()
def kill_background_tor():
"""Kill any Tor instances left open."""
try:
pids = subprocess.check_output("ps aux |grep '[t]orrc' | awk '{print $2}' 2>/dev/null", shell=True).split(b'\n')
for pid in pids:
subprocess.Popen("kill {}".format(int(pid)), shell=True).wait()
except ValueError:
pass
def main() -> None:
"""Repeatedly launch v3 Tor Onion Service using Stem."""
kill_background_tor()
private_key = os.urandom(32)
stem_key_data = stem_compatible_base64_blob_from_private_key(private_key)
while True:
tor_port = get_available_port(1000, 65535)
tor = Tor()
if not tor.connect(tor_port):
print("\nTor timed out!")
continue
print('Starting Onion Service... ', end='', flush=True)
try:
service = tor.controller.create_ephemeral_hidden_service(ports={80: 5000},
key_type='ED25519-V3',
key_content=stem_key_data,
await_publication=True)
except stem.OperationFailed as e:
print("OperationFailed:\n{}".format(e))
tor.stop()
raise
print('Onion Service {} is now online'.format(service.service_id))
tor.controller.remove_hidden_service(service.service_id)
tor.stop()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment