Skip to content

Instantly share code, notes, and snippets.

@mIcHyAmRaNe
Created May 23, 2019 17:24
Show Gist options
  • Save mIcHyAmRaNe/a0febdab4a1649fcee6160a570d2747a to your computer and use it in GitHub Desktop.
Save mIcHyAmRaNe/a0febdab4a1649fcee6160a570d2747a to your computer and use it in GitHub Desktop.
from os.path import curdir, realpath
from os import makedirs
import stem.process
import stem.control
import requests
import socks
import socket
import json
SOCKS_PORT = 9060
config = {
'SocksPort': f'{SOCKS_PORT}',
'ControlPort': f'{SOCKS_PORT+1}',
'DataDirectory': f'{realpath(curdir)}/tor/DataDirectory',
'CookieAuthFile': f'{realpath(curdir)}/tor/control.authcookie',
'CookieAuthentication': '1',
'CookieAuthFileGroupReadable': '1',
'__LeaveStreamsUnattached': '1'
}
def init_msg_handler(line):
print(line)
def enable_socks_proxy():
socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, '127.0.0.1', SOCKS_PORT)
socket.socket = socks.socksocket
socket.getaddrinfo = lambda *args: [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (args[0], args[1]))]
def disable_socks_proxy():
socket.socket = socks._orgsocket
def get_new_circ():
circuit_id = controller.new_circuit(await_build=True)
circ = controller.get_circuit(circuit_id)
return circ
def circ_info(circ):
ret = f'Circuit {circ.id} ({circ.purpose})\n'
for path_index, entry in enumerate(circ.path):
div = '┗' if (path_index == len(circ.path) - 1) else '┣'
fingerprint, nickname = entry
desc = controller.get_network_status(fingerprint, None)
address = desc.address if desc else 'unknown'
location = controller.get_info(f'ip-to-country/{address}', 'unknown')
ret += f' {div}━ {fingerprint} [{location}] {address.ljust(15)} {nickname}\n'
return ret
def stream_handler(ctrl, circ, func):
def attach_stream(stream):
print(stream)
print(stream.status, stream.id, stream.circ_id, stream.remote_reason)
# print('\n'.join(dir(stream)))
if stream.status == 'NEW':
ctrl.attach_stream(stream.id, circ.id)
ctrl.add_event_listener(attach_stream, stem.control.EventType.STREAM)
try:
return func()
finally:
ctrl.remove_event_listener(attach_stream)
makedirs(config.get('DataDirectory'), exist_ok=True)
tor_process = stem.process.launch_tor_with_config(
config=config,
init_msg_handler=init_msg_handler,
take_ownership=True
)
controller = stem.control.Controller.from_port(port=SOCKS_PORT+1)
controller.authenticate()
enable_socks_proxy()
for i in range(0, 10):
new_circ = get_new_circ()
print(circ_info(new_circ))
r = stream_handler(controller, new_circ, func=lambda: requests.get(url='https://httpbin.org/anything'))
try:
print(f'\n{json.dumps(r.json(), indent=4)}')
except json.decoder.JSONDecodeError:
print(r.text)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment