Skip to content

Instantly share code, notes, and snippets.

@itdaniher
Created November 17, 2017 04:59
Show Gist options
  • Save itdaniher/d9df12abc5eca08cb225e59ba4de65f8 to your computer and use it in GitHub Desktop.
Save itdaniher/d9df12abc5eca08cb225e59ba4de65f8 to your computer and use it in GitHub Desktop.
multithreaded python2.7 proxy checker
import sockschain
import urllib2
import time
import random
import re
import json
import requests
import pyping
import multiprocessing
import atexit
import signal
import pygeoip
import sys
import socket
import dns.resolver
reload(sys)
sys.setdefaultencoding("utf-8")
fout = open(str(int(time.time())) + '+proxies.log', 'w', 0)
gic = pygeoip.GeoIP('GeoIPCity.dat')
gii = pygeoip.GeoIP('GeoIPISP.dat')
def getter(url):
try:
r = urllib2.urlopen(urllib2.Request(
url, headers={'User-Agent': 'Mozilla/5.0'}
))
if r.code == 200:
return r
except:
pass
proxies = requests.get('http://multiproxy.org/txt_all/proxy.txt',
headers={'User-Agent': 'Mozilla/5.0'}).content.split('\n')
proxies = [p.strip() for p in proxies if p.strip()]
print(len(proxies))
random.shuffle(proxies)
sockschain.setdefaultproxy()
sockschain.adddefaultproxy(*sockschain.parseproxy('tor://localhost:9050/'))
socket.socket = sockschain.socksocket
def chained_tor(http_proxy):
return [
'tor://a:%d@localhost:9050/' % int(time.time()),
'http://' + http_proxy + '/'
]
bls = ['tor.dnsbl.sectoor.de', 'db.wpbl.info', 'cbl.abuseat.org',
'b.barracudacentral.org', 'misc.dnsbl.sorbs.net', 'zen.spamhaus.org']
ip_sources = ['https://ipinfo.io/ip', 'https://lumtest.com/myip.json', 'https://ipinfo.io?token=c0ef293720c323', 'https://ipinfo.io?token=2855c965e609d1',
'https://ipinfo.io?token=b10555a836c3e0', 'https://ipinfo.io?token=2d021cab2e7f84', 'https://wtfismyip.com/json', 'https://canhazip.com/']
ip_regex = re.compile('(?:[0-9]{1,3}\.){3}[0-9]{1,3}')
def ip_scraper(url):
r = getter(url)
if r:
return ip_regex.findall(r.read())[0]
def _chunk(l, x): return [l[i:i + x] for i in xrange(0, len(l), x)]
def check_ip(ip, bl):
try:
resolver = dns.resolver.Resolver()
query = '.'.join(reversed(str(ip).split("."))) + "." + bl
answers = resolver.query(query, "A")
answer_txt = resolver.query(query, "TXT")
return (bl, answers[0], answer_txt[0])
except (dns.resolver.NXDOMAIN, dns.name.EmptyLabel, dns.resolver.NoAnswer):
return None
def check_proxy(proxy):
sockschain.setdefaultproxy()
for sub_proxy in chained_tor(proxy):
sockschain.adddefaultproxy(*sockschain.parseproxy(sub_proxy))
socket.socket = sockschain.socksocket
ip = ip_scraper(random.choice(ip_sources))
for bl in bls:
if ip:
r = check_ip(ip, bl)
if r:
ip = None
if ip:
r = gic.record_by_addr(ip)
if not r:
country = 'US'
else:
country = r['country_code']
sockschain.setdefaultproxy()
import socket as socket2
socket.socket = socket2.socket
dur = pyping.ping(ip, count=2, timeout=2000).avg_rtt
if not dur:
dur = '1000'
org = gii.org_by_addr(ip)
if not org:
org = 'None'
return dur.split('.')[0].rjust(4, '0'), proxy.ljust(20, ' '), country, org
def check_proxies(proxies):
for proxy in proxies:
res = check_proxy(proxy)
if res:
r = ' '.join(res)
fout.write(r.strip() + u'\n')
print r
@atexit.register
def cleanup(signal=None, frame=None):
for p in multiprocessing.active_children():
p.terminate()
signal.signal(signal.SIGINT, cleanup)
processes = []
for chunk in _chunk(proxies, 5):
p = multiprocessing.Process(target=check_proxies, args=(chunk,))
processes.append(p)
p.start()
while len(multiprocessing.active_children()):
time.sleep(10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment