Skip to content

Instantly share code, notes, and snippets.

@da2x
Last active December 18, 2022 08:20
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save da2x/033dad3631f0622b8ccbf7e44b269808 to your computer and use it in GitHub Desktop.
Python 3 program for parsing Tranco ranking lists to discover Dat and IPFS websites.
from os.path import isfile
from urllib.request import urlopen
import concurrent.futures
import socket
import dns.resolver
import time
from urllib.request import urlopen, Request
import multiprocessing as mp
import os
import sys
socket.setdefaulttimeout(25)
datafile = "tranco.csv" # download at https://tranco-list.eu
poolsize = 22
dns_resolv = dns.resolver.Resolver()
dns_resolv.nameservers = ['1.0.0.1', '1.1.1.1', '74.82.42.42', '209.244.0.3', '209.244.0.4', '8.8.8.8']
dns_resolv.search = []
def dnslink_subdomain(domain):
res = []
lis_pos,rawdomain = domain.strip().split(",")
for prefix in ["_dnslink.", "_dnslink.www."]:
domain = prefix + rawdomain + '.'
try:
query = dns_resolv.query(domain, 'TXT')
for answer in query:
if str(answer).lower().startswith('"dnslink=/ip'):
res.append("FOUND:IPFS, #{0}, {1}, {2}".format(list_pos, domain, str(answer)))
except:
pass
return res
def dnstext_domain(domain):
res=[]
list_pos,rawdomain = domain.strip().split(",")
for prefix in ["", "www."]:
domain = prefix + rawdomain + '.'
try:
query = dns_resolv.query(domain, 'TXT')
for answer in query:
if str(answer).lower().startswith('"datkey='):
res.append("FOUND:DAT, #{0}, {1}, {2}".format(list_pos, domain, str(answer)))
if str(answer).lower().startswith('"dnslink=/ip'):
res.append("FOUND:IPFS, #{0}, {1}, {2}".format(list_pos, domain, str(answer)))
except:
pass
return res
def fetch_wellknown(domain):
res=[]
list_pos, rawdomain = domain.strip().split(",")
for prefix in ["", "www."]:
domain = prefix + rawdomain
url = "http://{0}/.well-known/dat".format(domain)
req = Request(url)
req.add_header('User-Agent', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36')
try:
resp = urlopen(req)
respbody = resp.read()
if b'dat://' in respbody:
filename = "httpdump/{0}.well-known-dat.httpdump".format(domain)
with open(filename, 'wb') as file:
file.write(respbody)
res.append("FOUND:DAT, #{0}, {1}, {2}".format(list_pos, domain, '.well-known/dat'))
except:
pass
return res
def cloudflare_cname(domain):
res=[]
list_pos,domain = domain.strip().split(",")
try:
for prefix in ["www."]:
q_domain = prefix + domain + '.'
query = dns_resolv.query(q_domain, 'CNAME')
for answer in query:
if 'cloudflare-ipfs.com' in str(answer).lower():
res.append("FOUND:IPFS, #{0}, {1}, {2}".format(list_pos, q_domain, str(answer)))
except:
pass
return res
def runner(domain):
res = []
for fun in [cloudflare_cname, dnstext_domain, dnslink_subdomain, fetch_wellknown]:
res.extend(fun(domain))
return res
with mp.Pool(processes=poolsize) as pool:
with open(datafile,"r") as file:
x = pool.imap_unordered(runner , file)
with open("results.txt", "a") as outf:
i=0
for res in x:
i+=1
if (i % 100 == 0):
sys.stdout.write(".")
sys.stdout.flush()
if (i % 1000 == 0):
sys.stdout.write("%d"%(i))
sys.stdout.flush()
for line in res:
outf.write(line + "\n")
outf.flush()
sys.stdout.write("!")
sys.stdout.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment