Created
August 11, 2014 15:32
-
-
Save hellais/a1a63df6e9f4958c9ab0 to your computer and use it in GitHub Desktop.
Enumerate HS descriptors
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from pprint import pprint | |
from base64 import b64decode | |
from twisted.internet import reactor, defer | |
from txtorcon import launch_tor, TorConfig, TorState | |
from twisted.internet.endpoints import TCP4ClientEndpoint | |
from twisted.internet.task import react | |
from twisted.web.client import readBody | |
from txsocksx.http import SOCKS5Agent | |
tor_binary = "/path/to/your/tor/binary/tor" | |
hidden_services = [ | |
"3g2upl4pq6kufc4m.onion" | |
] | |
socks_port = 9997 | |
def http_get_request(hidden_service): | |
torEndpoint = TCP4ClientEndpoint(reactor, '127.0.0.1', socks_port) | |
agent = SOCKS5Agent(reactor, proxyEndpoint=torEndpoint) | |
d = agent.request('GET', 'http://'+hidden_service+".onion") | |
return d | |
def progress_updates(x,y,z): | |
print x,y,z | |
class HiddenServiceDescriptor(object): | |
def __init__(self, hidden_service_descriptor): | |
self.intro_points = [] | |
self.desc = hidden_service_descriptor | |
self.parse_intro_point_data() | |
self.parse_intro_points() | |
def parse_intro_point_data(self): | |
begin = False | |
intro_points = [] | |
for line in self.desc.split("\n"): | |
if line.startswith("introduction-points"): | |
begin = True | |
continue | |
if begin and line.startswith("-----BEGIN MESSAGE-----"): | |
continue | |
elif begin and line.startswith("-----END MESSAGE-----"): | |
break | |
elif begin: | |
intro_points.append(line) | |
self.intro_point_data = b64decode("".join(intro_points)) | |
def parse_intro_points(self): | |
fields = { | |
"introduction-point": None, | |
"ip-address": None, | |
"onion-port": None, | |
"onion-key": None, | |
"service-key": None | |
} | |
last_key = None | |
current_field = fields.copy() | |
for line in self.intro_point_data.split("\n"): | |
field = line.split(" ") | |
if field[0] == "introduction-point" and last_key: | |
self.intro_points.append(current_field) | |
current_field = fields.copy() | |
if field[0] in fields.keys(): | |
last_key = field[0] | |
try: | |
current_field[last_key] = field[1] | |
except IndexError: | |
current_field[last_key] = [] | |
elif last_key: | |
current_field[last_key].append(line) | |
self.intro_points.append(current_field) | |
@defer.inlineCallbacks | |
def state_complete(state): | |
for hidden_service in hidden_services: | |
print "Address: %s" % hidden_service | |
try: | |
hidden_service = hidden_service.replace(".onion", "") | |
yield http_get_request(hidden_service) | |
hidden_service_descriptor = yield state.protocol.get_info_raw("hs/desc/id/" + hidden_service) | |
hs = HiddenServiceDescriptor(hidden_service_descriptor) | |
with open(hidden_service+"-desc.txt", "w+") as f: | |
json.dump(hs.intro_points, f) | |
for ip in hs.intro_points: | |
print "%s" % ip['ip-address'] | |
print "Total intro points: %d" % len(hs.intro_points) | |
except: | |
print "FAIL" | |
def complete(proto): | |
state = TorState(proto.tor_protocol) | |
state.post_bootstrap.addCallback(state_complete) | |
@state.post_bootstrap.addErrback | |
def eb(failure): | |
print failure | |
return state.post_bootstrap | |
config = TorConfig() | |
config.SocksPort = socks_port | |
config.ControlPort = 9998 | |
config.DataDir = '/tmp/tor-hs-enumerate' | |
config.Log = ["info stdout", | |
"info file /tmp/tor-hs-enumerate.log"] | |
d = launch_tor(config, reactor, progress_updates=progress_updates, tor_binary=tor_binary) | |
d.addCallback(complete) | |
reactor.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment