Skip to content

Instantly share code, notes, and snippets.

@a-moss
Created September 9, 2016 03:41
Show Gist options
  • Save a-moss/1578eb07b2570b5d97d85b1e93e81cc8 to your computer and use it in GitHub Desktop.
Save a-moss/1578eb07b2570b5d97d85b1e93e81cc8 to your computer and use it in GitHub Desktop.
import cfscrape
import json
import sys
import argparse
import requests
import logging
import time
import random
import urllib2
from pgoapi import PGoApi
from pgoapi.exceptions import ServerSideRequestThrottlingException
from queue import Queue
from threading import Thread
# Ignore countries
ignore = [ 'China' ]
##### END OF CONFIGURATION #####
logging.basicConfig(format='%(asctime)s %(message)s')
log = logging.getLogger(__name__)
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--proxy-file", help="Text file containing proxies. One proxy per line like http://x.x.x.x:3128")
parser.add_argument("-o", "--output-file", help="Output file with working proxies", default="working_proxies.txt")
parser.add_argument("--proxychains", help="Output in proxychains-ng format. Something like 'http x.x.x.x 3128'", action='store_true')
parser.add_argument("-t", "--timeout", help="Proxy timeout. Default is 15 seconds.", default=15, type=float)
parser.add_argument("-v", "--verbose", help="Run in the verbose mode.", action='store_true')
parser.add_argument("-u", "--username", help="Username if you want to test account connectivity")
parser.add_argument("-p", "--password", help="Password if you want to test account connectivity")
args = parser.parse_args()
working_proxies = []
def check_pogo_login(proxy, username, password):
try:
api = PGoApi()
api.set_proxy({'http': proxy, 'https': proxy})
api.set_position(40.7127837, -74.005941, 0.0)
api.login('ptc', username, password)
time.sleep(1)
req = api.create_request()
req.get_inventory()
response = req.call()
if response['status_code'] == 3:
log.error("The account you're using to test is banned. Try another one.")
sys.exit(1)
except ServerSideRequestThrottlingException as e:
secs = random.randrange(700,1100) / 100;
log.debug("[ %s\t] Server side throttling, Waiting %s seconds!",proxy, secs);
time.sleep(secs)
check_pogo_login(proxy, username, password)
def check_single_proxy(proxy_queue, timeout, working_proxies):
proxy_test_url = 'https://sso.pokemon.com/'
proxy = proxy_queue.get_nowait()
if proxy:
try:
proxy_response = requests.get(proxy_test_url, proxies={'http': proxy, 'https': proxy}, timeout=timeout)
if proxy_response.status_code == 200:
# Test logging in
if args.username and args.password:
check_pogo_login(proxy, args.username, args.password)
log.info("[ %s\t] Ready for PokemonGo!",proxy)
proxy_queue.task_done()
working_proxies.append(proxy)
return True
else:
proxy_error = "Invalid Status Code - " + str(proxy_response.status_code)
except requests.ConnectTimeout:
proxy_queue.task_done()
return False
except requests.ConnectionError:
proxy_queue.task_done()
return False
except Exception as e:
proxy_error = e
else:
proxy_error = "Empty Proxy."
log.info("[ %s\t] Error: %s",proxy,proxy_error)
proxy_queue.task_done()
return False
def save_to_file( proxies, file ):
file = open( file, "w" )
file.truncate()
for proxy in proxies:
if args.proxychains:
# Split the protocol
protocol, address = proxy.split("://",2)
# address = proxy.split("://")[1]
# Split the port
ip, port = address.split(":",2)
# Write to file
file.write( protocol + " " + ip + " " + port + "\n")
else:
file.write(proxy + "\n")
def getWebData(url):
opener = urllib2.build_opener()
opener.addheaders = [('User-agent', 'Mozilla/5.0')]
data = opener.open(url).readlines()
opener.close()
return data
##### proxyhttp.net #####
def getProxyHttp():
page1 = "http://proxyhttp.net/free-list/anonymous-server-hide-ip-address#proxylist"
page2 = "http://proxyhttp.net/free-list/anonymous-server-hide-ip-address/2#proxylist"
page3 = "http://proxyhttp.net/free-list/anonymous-server-hide-ip-address/3#proxylist"
page4 = "http://proxyhttp.net/free-list/anonymous-server-hide-ip-address/4#proxylist"
page5 = "http://proxyhttp.net/free-list/anonymous-server-hide-ip-address/5#proxylist"
page6 = "http://proxyhttp.net/free-list/anonymous-server-hide-ip-address/6#proxylist"
page7 = "http://proxyhttp.net/free-list/anonymous-server-hide-ip-address/7#proxylist"
page8 = "http://proxyhttp.net/free-list/anonymous-server-hide-ip-address/8#proxylist"
page9 = "http://proxyhttp.net/free-list/anonymous-server-hide-ip-address/9#proxylist"
result = parseSocksListPage(page1) + \
parseSocksListPage(page2) + \
parseSocksListPage(page3) + \
parseSocksListPage(page4) + \
parseSocksListPage(page5) + \
parseSocksListPage(page6) + \
parseSocksListPage(page7) + \
parseSocksListPage(page8) + \
parseSocksListPage(page9)
return result
##### sockslist.net #####
def getSocksList():
page1 = "http://sockslist.net/proxy/server-socks-hide-ip-address#proxylist"
page2 = "http://sockslist.net/proxy/server-socks-hide-ip-address/2#proxylist"
page3 = "http://sockslist.net/proxy/server-socks-hide-ip-address/3#proxylist"
result = parseSocksListPage(page1) + parseSocksListPage(page2) + parseSocksListPage(page3)
return result
def parseSocksListPage(url):
ipport = {}
ip = ""
port = ""
webData = getWebData(url)
for line in webData:
if len(line) > 0:
if "^" in line and ";" in line and " = " in line:
xorDict = crazyXORdecoding(line.strip())
else:
if "/check?i" in line and ip == "":
ip = line.split("=")[1].split(":")[0]
if ">check</a>" in line and ip != "":
port = line.split("(")[1].split(")")[0]
ipport[ip] = port
ip = ""
port = ""
#print(ipport)
finalipport = {}
for key in ipport:
portparts = ipport[key].split("^")
translateparts = []
for i in portparts:
if i.isdigit():
translateparts.append(int(i))
else:
translateparts.append(int(xorDict[i]))
total = translateparts[0]
for j in range(1, len(translateparts)):
total = total^translateparts[j]
finalipport[key] = str(total)
result = []
for key in finalipport:
result.append(key+":"+finalipport[key])
return result
def crazyXORdecoding(line):
"""
Recursion is for losers
"""
xorDict = {}
alist1 = line.split(";")
for i in alist1:
if "=" in i:
xorDict[i.split(" = ")[0]] = i.split(" = ")[1]
for i in xorDict:
recursiveXORdecode(xorDict, i)
return xorDict
def recursiveXORdecode(xorDict, akey):
"""
Errr... I meant for winners!
"""
if akey.isdigit():
return akey
avalue = xorDict[akey]
if avalue.isdigit():
return avalue
elif "^" in avalue:
avalue1, avalue2 = avalue.split("^")
answer = str(int(recursiveXORdecode(xorDict, avalue1)) ^ int(recursiveXORdecode(xorDict, avalue2)))
xorDict[akey] = answer
return answer
#########################
def writeToFile(fileName, proxyList):
print("[EMY SCRAPE] Writing to %s..." % fileName)
stringToWrite = "\n".join(proxyList)
f = open(prefix+fileName, 'w')
f.write(stringToWrite)
f.close()
def scrape(http):
if http:
print("[SCRAPER] Beginning to scrape HTTP proxies...")
starttime = time.time()
starttime2 = time.time()
proxyHttp = getProxyHttp()
endtime2 = time.time() - starttime2
httpProxies = proxyHttp
httpProxies = list(set(httpProxies))
endtime = time.time() - starttime
print("[SCRAPER] Total HTTP proxies: %i (%.2fsec)" % (len(httpProxies), endtime))
print(" proxyhttp.net: %i (%.2fsec)" % (len(proxyHttp), endtime2))
print("[SCRAPER] Done.")
return proxyHttp
else:
print("[SCRAPER] Beginning to scrape SOCKS proxies...")
starttime = time.time()
starttime2 = time.time()
socksListNet = getSocksList()
endtime2 = time.time() - starttime2
socksProxies = socksListNet
socksProxies = socksListNet
socksProxies = list(set(socksProxies))
endtime = time.time() - starttime
print("[SCRAPER] Total SOCKS proxies: %i (%.2fsec)" % (len(socksProxies), endtime))
print(" sockslist.net: %i (%.2fsec)" % (len(socksListNet), endtime2))
print("[SCRAPER] Done.")
return socksListNet
if __name__ == "__main__":
log.setLevel(logging.INFO);
if args.verbose:
log.setLevel(logging.DEBUG);
log.debug("Running in verbose mode (-v).")
if args.username and args.password:
log.info("Running with account testing. Going try to login into %s on each step.", args.username)
if args.proxy_file:
with open(args.proxy_file) as f:
proxies = f.read().splitlines()
else:
proxies = [];
for httpProxy in scrape(True):
proxies.append(httpProxy)
for socksProxy in scrape(False):
proxies.append("socks5://" + socksProxy)
proxies.append("socks4://" + socksProxy)
log.info( "Read %s proxies. Starting proxies availability test...", len(proxies) )
# Initialize the Proxy Queue
proxy_queue = Queue()
for proxy in proxies:
proxy_queue.put(proxy)
count = 0
for i in range(0, len(proxies)):
count += 1
if args.username and args.password:
time.sleep(1)
log.debug("Launching thread %s", i)
t = Thread(target=check_single_proxy,
name='check_proxy',
args=(proxy_queue, args.timeout, working_proxies))
t.daemon = True
t.start()
if count % 30 == 0:
time.sleep(5)
# Wait for the queue to finish
proxy_queue.join()
log.info( "Found %s working proxies! Writing to %s", len(working_proxies), args.output_file )
save_to_file( working_proxies, args.output_file )
sys.exit(0)
@Gallifery
Copy link

Maybe a dumb question, but how to use it? Run it with python in the terminal or what?

@subzerofun
Copy link

Thanks for the code!
Would it be hard to add a function that checks the location (country code) of the servers and adds it the the output? Like this:

2016-09-09 ... [ US ][ socks5://123.123.123.42:1234 ] Ready for PokemonGo!

If i could do it myself, i wouldn´t bother you - sorry... but my Python knowledge is very limited. I understand what each part of the code does, but if i try to write(add) my own functions it usually just results in a lot of error messages :-)

@DrKinSlayeR
Copy link

It's possible to modify this script to test proxies with this url like pokemon go-map : https://pgorelease.nianticlabs.com/plfe/rpc. When I change the proxy_test_url it doesn't work, 0 working proxies.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment