Last active
December 18, 2022 08:20
Star
You must be signed in to star a gist
Python 3 program for parsing Tranco ranking lists to discover Dat and IPFS websites.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from os.path import isfile | |
from urllib.request import urlopen | |
import concurrent.futures | |
import socket | |
import dns.resolver | |
import time | |
from urllib.request import urlopen, Request | |
import multiprocessing as mp | |
import os | |
import sys | |
socket.setdefaulttimeout(25) | |
datafile = "tranco.csv" # download at https://tranco-list.eu | |
poolsize = 22 | |
dns_resolv = dns.resolver.Resolver() | |
dns_resolv.nameservers = ['1.0.0.1', '1.1.1.1', '74.82.42.42', '209.244.0.3', '209.244.0.4', '8.8.8.8'] | |
dns_resolv.search = [] | |
def dnslink_subdomain(domain): | |
res = [] | |
lis_pos,rawdomain = domain.strip().split(",") | |
for prefix in ["_dnslink.", "_dnslink.www."]: | |
domain = prefix + rawdomain + '.' | |
try: | |
query = dns_resolv.query(domain, 'TXT') | |
for answer in query: | |
if str(answer).lower().startswith('"dnslink=/ip'): | |
res.append("FOUND:IPFS, #{0}, {1}, {2}".format(list_pos, domain, str(answer))) | |
except: | |
pass | |
return res | |
def dnstext_domain(domain): | |
res=[] | |
list_pos,rawdomain = domain.strip().split(",") | |
for prefix in ["", "www."]: | |
domain = prefix + rawdomain + '.' | |
try: | |
query = dns_resolv.query(domain, 'TXT') | |
for answer in query: | |
if str(answer).lower().startswith('"datkey='): | |
res.append("FOUND:DAT, #{0}, {1}, {2}".format(list_pos, domain, str(answer))) | |
if str(answer).lower().startswith('"dnslink=/ip'): | |
res.append("FOUND:IPFS, #{0}, {1}, {2}".format(list_pos, domain, str(answer))) | |
except: | |
pass | |
return res | |
def fetch_wellknown(domain): | |
res=[] | |
list_pos, rawdomain = domain.strip().split(",") | |
for prefix in ["", "www."]: | |
domain = prefix + rawdomain | |
url = "http://{0}/.well-known/dat".format(domain) | |
req = Request(url) | |
req.add_header('User-Agent', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36') | |
try: | |
resp = urlopen(req) | |
respbody = resp.read() | |
if b'dat://' in respbody: | |
filename = "httpdump/{0}.well-known-dat.httpdump".format(domain) | |
with open(filename, 'wb') as file: | |
file.write(respbody) | |
res.append("FOUND:DAT, #{0}, {1}, {2}".format(list_pos, domain, '.well-known/dat')) | |
except: | |
pass | |
return res | |
def cloudflare_cname(domain): | |
res=[] | |
list_pos,domain = domain.strip().split(",") | |
try: | |
for prefix in ["www."]: | |
q_domain = prefix + domain + '.' | |
query = dns_resolv.query(q_domain, 'CNAME') | |
for answer in query: | |
if 'cloudflare-ipfs.com' in str(answer).lower(): | |
res.append("FOUND:IPFS, #{0}, {1}, {2}".format(list_pos, q_domain, str(answer))) | |
except: | |
pass | |
return res | |
def runner(domain): | |
res = [] | |
for fun in [cloudflare_cname, dnstext_domain, dnslink_subdomain, fetch_wellknown]: | |
res.extend(fun(domain)) | |
return res | |
with mp.Pool(processes=poolsize) as pool: | |
with open(datafile,"r") as file: | |
x = pool.imap_unordered(runner , file) | |
with open("results.txt", "a") as outf: | |
i=0 | |
for res in x: | |
i+=1 | |
if (i % 100 == 0): | |
sys.stdout.write(".") | |
sys.stdout.flush() | |
if (i % 1000 == 0): | |
sys.stdout.write("%d"%(i)) | |
sys.stdout.flush() | |
for line in res: | |
outf.write(line + "\n") | |
outf.flush() | |
sys.stdout.write("!") | |
sys.stdout.flush() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment