Skip to content

Instantly share code, notes, and snippets.

@hellais
Created August 11, 2014 15:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hellais/a1a63df6e9f4958c9ab0 to your computer and use it in GitHub Desktop.
Save hellais/a1a63df6e9f4958c9ab0 to your computer and use it in GitHub Desktop.
Enumerate HS descriptors
import json
from pprint import pprint
from base64 import b64decode
from twisted.internet import reactor, defer
from txtorcon import launch_tor, TorConfig, TorState
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.internet.task import react
from twisted.web.client import readBody
from txsocksx.http import SOCKS5Agent
tor_binary = "/path/to/your/tor/binary/tor"
hidden_services = [
"3g2upl4pq6kufc4m.onion"
]
socks_port = 9997
def http_get_request(hidden_service):
torEndpoint = TCP4ClientEndpoint(reactor, '127.0.0.1', socks_port)
agent = SOCKS5Agent(reactor, proxyEndpoint=torEndpoint)
d = agent.request('GET', 'http://'+hidden_service+".onion")
return d
def progress_updates(x,y,z):
print x,y,z
class HiddenServiceDescriptor(object):
def __init__(self, hidden_service_descriptor):
self.intro_points = []
self.desc = hidden_service_descriptor
self.parse_intro_point_data()
self.parse_intro_points()
def parse_intro_point_data(self):
begin = False
intro_points = []
for line in self.desc.split("\n"):
if line.startswith("introduction-points"):
begin = True
continue
if begin and line.startswith("-----BEGIN MESSAGE-----"):
continue
elif begin and line.startswith("-----END MESSAGE-----"):
break
elif begin:
intro_points.append(line)
self.intro_point_data = b64decode("".join(intro_points))
def parse_intro_points(self):
fields = {
"introduction-point": None,
"ip-address": None,
"onion-port": None,
"onion-key": None,
"service-key": None
}
last_key = None
current_field = fields.copy()
for line in self.intro_point_data.split("\n"):
field = line.split(" ")
if field[0] == "introduction-point" and last_key:
self.intro_points.append(current_field)
current_field = fields.copy()
if field[0] in fields.keys():
last_key = field[0]
try:
current_field[last_key] = field[1]
except IndexError:
current_field[last_key] = []
elif last_key:
current_field[last_key].append(line)
self.intro_points.append(current_field)
@defer.inlineCallbacks
def state_complete(state):
for hidden_service in hidden_services:
print "Address: %s" % hidden_service
try:
hidden_service = hidden_service.replace(".onion", "")
yield http_get_request(hidden_service)
hidden_service_descriptor = yield state.protocol.get_info_raw("hs/desc/id/" + hidden_service)
hs = HiddenServiceDescriptor(hidden_service_descriptor)
with open(hidden_service+"-desc.txt", "w+") as f:
json.dump(hs.intro_points, f)
for ip in hs.intro_points:
print "%s" % ip['ip-address']
print "Total intro points: %d" % len(hs.intro_points)
except:
print "FAIL"
def complete(proto):
state = TorState(proto.tor_protocol)
state.post_bootstrap.addCallback(state_complete)
@state.post_bootstrap.addErrback
def eb(failure):
print failure
return state.post_bootstrap
config = TorConfig()
config.SocksPort = socks_port
config.ControlPort = 9998
config.DataDir = '/tmp/tor-hs-enumerate'
config.Log = ["info stdout",
"info file /tmp/tor-hs-enumerate.log"]
d = launch_tor(config, reactor, progress_updates=progress_updates, tor_binary=tor_binary)
d.addCallback(complete)
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment