Created
April 27, 2017 06:10
-
-
Save zyan0/05686eca633794f28d4d465d80b9f30e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import StringIO | |
import time | |
import pycurl | |
import stem.control | |
from stem.util import term | |
import stem.process | |
# Static exit for us to make 2-hop circuits through. Picking aurora, a | |
# particularly beefy one... | |
# | |
# https://atlas.torproject.org/#details/379FB450010D17078B3766C2273303C358C3A442 | |
EXIT_FINGERPRINT = '379FB450010D17078B3766C2273303C358C3A442' | |
SOCKS_PORT = 7000 | |
CONTROLLER_PORT = 9051 | |
CONNECTION_TIMEOUT = 30 # timeout before we give up on a circuit | |
def query(url): | |
""" | |
Uses pycurl to fetch a site using the proxy on the SOCKS_PORT. | |
""" | |
output = StringIO.StringIO() | |
query = pycurl.Curl() | |
query.setopt(pycurl.URL, url) | |
query.setopt(pycurl.PROXY, 'localhost') | |
query.setopt(pycurl.PROXYPORT, SOCKS_PORT) | |
query.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5_HOSTNAME) | |
query.setopt(pycurl.CONNECTTIMEOUT, CONNECTION_TIMEOUT) | |
query.setopt(pycurl.WRITEFUNCTION, output.write) | |
try: | |
query.perform() | |
return output.getvalue() | |
except pycurl.error as exc: | |
raise ValueError("Unable to reach %s (%s)" % (url, exc)) | |
def scan(controller, path): | |
""" | |
Fetch check.torproject.org through the given path of relays, providing back | |
the time it took. | |
""" | |
circuit_id = controller.new_circuit(path, await_build=True) | |
def attach_stream(stream): | |
if stream.status == 'NEW': | |
controller.attach_stream(stream.id, circuit_id) | |
controller.add_event_listener(attach_stream, stem.control.EventType.STREAM) | |
try: | |
controller.set_conf('__LeaveStreamsUnattached', | |
'1') # leave stream management to us | |
start_time = time.time() | |
check_page = query('https://check.torproject.org/') | |
if 'Congratulations. This browser is configured to use Tor.' not in check_page: | |
raise ValueError("Request didn't have the right content") | |
return time.time() - start_time | |
finally: | |
controller.remove_event_listener(attach_stream) | |
controller.reset_conf('__LeaveStreamsUnattached') | |
def print_bootstrap_lines(line): | |
if "Bootstrapped " in line: | |
print(term.format(line, term.Color.BLUE)) | |
print(term.format("Starting Tor:", term.Attr.BOLD)) | |
tor_process = stem.process.launch_tor_with_config( | |
config={ | |
'SocksPort': str(SOCKS_PORT), | |
'ControlPort': str(CONTROLLER_PORT), | |
'ExitNodes': '{ru}', | |
'__DisablePredictedCircuits': '1', | |
'MaxOnionsPending': '0', | |
'newcircuitperiod': '999999999', | |
'maxcircuitdirtiness': '999999999', | |
}, | |
init_msg_handler=print_bootstrap_lines, | |
completion_percent=80) | |
try: | |
with stem.control.Controller.from_port(port=CONTROLLER_PORT) as controller: | |
controller.authenticate() | |
# relay_fingerprints = [desc.fingerprint for desc in controller.get_network_statuses()] | |
# | |
# for fingerprint in relay_fingerprints: | |
# try: | |
# time_taken = scan(controller, [fingerprint, EXIT_FINGERPRINT]) | |
# print('%s => %0.2f seconds' % (fingerprint, time_taken)) | |
# except Exception as exc: | |
# print('%s => %s' % (fingerprint, exc)) | |
circuit_id = controller.new_circuit( | |
path=['60E6E78926A45A77C1CB6BCC59D459083CB8530F', '1A19C6822777515982DAD87CD2AA65B071305A1E'], await_build=True) | |
print circuit_id | |
except Exception as e: | |
print e | |
tor_process.kill() | |
finally: | |
tor_process.kill() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment