Created
June 24, 2017 19:49
-
-
Save d33tah/e9a6a2c8874f7260cf066913d1e86d2a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import bencode | |
from io import BytesIO | |
from struct import unpack | |
from socket import AF_INET, inet_ntop, error as socket_error | |
from socket import gethostbyname, socket, SOCK_DGRAM | |
from unittest import TestCase, main as unittest_main | |
def dht_query(s, ip, port, info_hash): | |
a_bencoded = b''.join([b'd', b'2:id', b'20:', info_hash, b'9:info_hash', | |
b'20:', info_hash, b'e']) | |
all_bencoded = b''.join([b'd', b'1:a', a_bencoded, b'1:q', b'9:get_peers', | |
b'1:t', b'2:0f', b'1:y', b'1:q', b'e']) | |
s.sendto(all_bencoded, (ip, port)) | |
s.settimeout(0.5) | |
try: | |
response_raw = s.recvfrom(1024)[0] | |
except socket_error: # timed out? | |
return [], [] | |
return interpret_dht_response(response_raw) | |
def id_to_ipport(node_id): | |
ip = inet_ntop(AF_INET, node_id[-6:][:4]) | |
port = unpack(">H", node_id[-2:])[0] | |
return ip, port | |
def interpret_dht_response(response_raw): | |
print(repr(response_raw)) | |
response = bencode.bdecode(response_raw) | |
if b'r' not in response: | |
return [], [] | |
found_peers, found_nodes = [], [] | |
for node_id in response[b'r'].get(b'values', []): | |
found_peers += [id_to_ipport(node_id)] | |
for offset in range(0, len(response[b'r'].get(b'nodes', '')), 26): | |
node_id = response[b'r'].get(b'nodes', '')[offset:offset+26] | |
found_nodes += [id_to_ipport(node_id)] | |
return found_peers, found_nodes | |
class SocketMock: | |
def __init__(self, reply): | |
self.reply = reply | |
def sendto(self, *args, **kwargs): | |
return None | |
def settimeout(self, *args, **kwargs): | |
return None | |
def recvfrom(self, *args, **kwargs): | |
return self.reply, None | |
class DHTTest(TestCase): | |
def testNodes(self): | |
ihash = b'\x97\xb5\x80\xa4\xcd\xf5i\x9dbh\x19$\x1exh\xba\x8f\x96Q\x82' | |
r = (b'd2:ip6:M\xed\x0b2\xb5\xca1:rd2:id20:2\xf5NisQ\xffJ\xec)\xcd\xba' | |
b'\xab\xf2\xfb\xe3F|\xc2g5:nodes26:\xfb~\x1c\xef\x8au]\xd6:\xf2!' | |
b'\xed\xaf\x11\xd6\xcf4o\x89\x02mn\x94\xa0\xd8\xb2e1:t2:0f1:y1:r' | |
b'e') | |
found_peers, found_nodes = dht_query(SocketMock(r), None, None, ihash) | |
self.assertEqual(found_peers, []) | |
self.assertEqual(found_nodes, [('109.110.148.160', 55474)]) | |
def testPeers(self): | |
ihash = b'\x97\xb5\x80\xa4\xcd\xf5i\x9dbh\x19$\x1exh\xba\x8f\x96Q\x82' | |
r = (b'd2:ip6:M\xed\x0b2\xb5\xca1:rd2:id20:2\xf5NisQ\xffJ\xec)\xcd\xba' | |
b'\xab\xf2\xfb\xe3F|\xc2g6:valuesl6:mn\x94\xa0\xd8\xb2ee1:t2:0f1:' | |
b'y1:re') | |
found_peers, found_nodes = dht_query(SocketMock(r), None, None, ihash) | |
self.assertEqual(found_peers, [('109.110.148.160', 55474)]) | |
self.assertEqual(found_nodes, []) | |
def system_test(): | |
info_hash = b'\xb4\x15\xc9\x13d>_\xf4\x9f\xe3}0K\xbb^n\x11\xadQ\x01' | |
dht_ips = [(gethostbyname('router.bittorrent.com'), 6881)] | |
dht_sock = socket(AF_INET, SOCK_DGRAM) | |
while dht_ips: | |
node = dht_ips.pop() | |
ip, port = node[0], node[1] | |
try: | |
peers, nodes = dht_query(dht_sock, ip, port, info_hash) | |
except bencode.BTL.BTFailure: | |
continue | |
if peers: | |
print(peers) | |
return | |
dht_ips += nodes | |
if __name__ == '__main__': | |
system_test() | |
unittest_main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment