Created
September 9, 2016 03:41
-
-
Save a-moss/1578eb07b2570b5d97d85b1e93e81cc8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cfscrape | |
import json | |
import sys | |
import argparse | |
import requests | |
import logging | |
import time | |
import random | |
import urllib2 | |
from pgoapi import PGoApi | |
from pgoapi.exceptions import ServerSideRequestThrottlingException | |
from queue import Queue | |
from threading import Thread | |
# Ignore countries | |
ignore = [ 'China' ] | |
##### END OF CONFIGURATION ##### | |
logging.basicConfig(format='%(asctime)s %(message)s') | |
log = logging.getLogger(__name__) | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-f", "--proxy-file", help="Text file containing proxies. One proxy per line like http://x.x.x.x:3128") | |
parser.add_argument("-o", "--output-file", help="Output file with working proxies", default="working_proxies.txt") | |
parser.add_argument("--proxychains", help="Output in proxychains-ng format. Something like 'http x.x.x.x 3128'", action='store_true') | |
parser.add_argument("-t", "--timeout", help="Proxy timeout. Default is 15 seconds.", default=15, type=float) | |
parser.add_argument("-v", "--verbose", help="Run in the verbose mode.", action='store_true') | |
parser.add_argument("-u", "--username", help="Username if you want to test account connectivity") | |
parser.add_argument("-p", "--password", help="Password if you want to test account connectivity") | |
args = parser.parse_args() | |
working_proxies = [] | |
def check_pogo_login(proxy, username, password): | |
try: | |
api = PGoApi() | |
api.set_proxy({'http': proxy, 'https': proxy}) | |
api.set_position(40.7127837, -74.005941, 0.0) | |
api.login('ptc', username, password) | |
time.sleep(1) | |
req = api.create_request() | |
req.get_inventory() | |
response = req.call() | |
if response['status_code'] == 3: | |
log.error("The account you're using to test is banned. Try another one.") | |
sys.exit(1) | |
except ServerSideRequestThrottlingException as e: | |
secs = random.randrange(700,1100) / 100; | |
log.debug("[ %s\t] Server side throttling, Waiting %s seconds!",proxy, secs); | |
time.sleep(secs) | |
check_pogo_login(proxy, username, password) | |
def check_single_proxy(proxy_queue, timeout, working_proxies): | |
proxy_test_url = 'https://sso.pokemon.com/' | |
proxy = proxy_queue.get_nowait() | |
if proxy: | |
try: | |
proxy_response = requests.get(proxy_test_url, proxies={'http': proxy, 'https': proxy}, timeout=timeout) | |
if proxy_response.status_code == 200: | |
# Test logging in | |
if args.username and args.password: | |
check_pogo_login(proxy, args.username, args.password) | |
log.info("[ %s\t] Ready for PokemonGo!",proxy) | |
proxy_queue.task_done() | |
working_proxies.append(proxy) | |
return True | |
else: | |
proxy_error = "Invalid Status Code - " + str(proxy_response.status_code) | |
except requests.ConnectTimeout: | |
proxy_queue.task_done() | |
return False | |
except requests.ConnectionError: | |
proxy_queue.task_done() | |
return False | |
except Exception as e: | |
proxy_error = e | |
else: | |
proxy_error = "Empty Proxy." | |
log.info("[ %s\t] Error: %s",proxy,proxy_error) | |
proxy_queue.task_done() | |
return False | |
def save_to_file( proxies, file ): | |
file = open( file, "w" ) | |
file.truncate() | |
for proxy in proxies: | |
if args.proxychains: | |
# Split the protocol | |
protocol, address = proxy.split("://",2) | |
# address = proxy.split("://")[1] | |
# Split the port | |
ip, port = address.split(":",2) | |
# Write to file | |
file.write( protocol + " " + ip + " " + port + "\n") | |
else: | |
file.write(proxy + "\n") | |
def getWebData(url): | |
opener = urllib2.build_opener() | |
opener.addheaders = [('User-agent', 'Mozilla/5.0')] | |
data = opener.open(url).readlines() | |
opener.close() | |
return data | |
##### proxyhttp.net ##### | |
def getProxyHttp(): | |
page1 = "http://proxyhttp.net/free-list/anonymous-server-hide-ip-address#proxylist" | |
page2 = "http://proxyhttp.net/free-list/anonymous-server-hide-ip-address/2#proxylist" | |
page3 = "http://proxyhttp.net/free-list/anonymous-server-hide-ip-address/3#proxylist" | |
page4 = "http://proxyhttp.net/free-list/anonymous-server-hide-ip-address/4#proxylist" | |
page5 = "http://proxyhttp.net/free-list/anonymous-server-hide-ip-address/5#proxylist" | |
page6 = "http://proxyhttp.net/free-list/anonymous-server-hide-ip-address/6#proxylist" | |
page7 = "http://proxyhttp.net/free-list/anonymous-server-hide-ip-address/7#proxylist" | |
page8 = "http://proxyhttp.net/free-list/anonymous-server-hide-ip-address/8#proxylist" | |
page9 = "http://proxyhttp.net/free-list/anonymous-server-hide-ip-address/9#proxylist" | |
result = parseSocksListPage(page1) + \ | |
parseSocksListPage(page2) + \ | |
parseSocksListPage(page3) + \ | |
parseSocksListPage(page4) + \ | |
parseSocksListPage(page5) + \ | |
parseSocksListPage(page6) + \ | |
parseSocksListPage(page7) + \ | |
parseSocksListPage(page8) + \ | |
parseSocksListPage(page9) | |
return result | |
##### sockslist.net ##### | |
def getSocksList(): | |
page1 = "http://sockslist.net/proxy/server-socks-hide-ip-address#proxylist" | |
page2 = "http://sockslist.net/proxy/server-socks-hide-ip-address/2#proxylist" | |
page3 = "http://sockslist.net/proxy/server-socks-hide-ip-address/3#proxylist" | |
result = parseSocksListPage(page1) + parseSocksListPage(page2) + parseSocksListPage(page3) | |
return result | |
def parseSocksListPage(url): | |
ipport = {} | |
ip = "" | |
port = "" | |
webData = getWebData(url) | |
for line in webData: | |
if len(line) > 0: | |
if "^" in line and ";" in line and " = " in line: | |
xorDict = crazyXORdecoding(line.strip()) | |
else: | |
if "/check?i" in line and ip == "": | |
ip = line.split("=")[1].split(":")[0] | |
if ">check</a>" in line and ip != "": | |
port = line.split("(")[1].split(")")[0] | |
ipport[ip] = port | |
ip = "" | |
port = "" | |
#print(ipport) | |
finalipport = {} | |
for key in ipport: | |
portparts = ipport[key].split("^") | |
translateparts = [] | |
for i in portparts: | |
if i.isdigit(): | |
translateparts.append(int(i)) | |
else: | |
translateparts.append(int(xorDict[i])) | |
total = translateparts[0] | |
for j in range(1, len(translateparts)): | |
total = total^translateparts[j] | |
finalipport[key] = str(total) | |
result = [] | |
for key in finalipport: | |
result.append(key+":"+finalipport[key]) | |
return result | |
def crazyXORdecoding(line): | |
""" | |
Recursion is for losers | |
""" | |
xorDict = {} | |
alist1 = line.split(";") | |
for i in alist1: | |
if "=" in i: | |
xorDict[i.split(" = ")[0]] = i.split(" = ")[1] | |
for i in xorDict: | |
recursiveXORdecode(xorDict, i) | |
return xorDict | |
def recursiveXORdecode(xorDict, akey): | |
""" | |
Errr... I meant for winners! | |
""" | |
if akey.isdigit(): | |
return akey | |
avalue = xorDict[akey] | |
if avalue.isdigit(): | |
return avalue | |
elif "^" in avalue: | |
avalue1, avalue2 = avalue.split("^") | |
answer = str(int(recursiveXORdecode(xorDict, avalue1)) ^ int(recursiveXORdecode(xorDict, avalue2))) | |
xorDict[akey] = answer | |
return answer | |
######################### | |
def writeToFile(fileName, proxyList): | |
print("[EMY SCRAPE] Writing to %s..." % fileName) | |
stringToWrite = "\n".join(proxyList) | |
f = open(prefix+fileName, 'w') | |
f.write(stringToWrite) | |
f.close() | |
def scrape(http): | |
if http: | |
print("[SCRAPER] Beginning to scrape HTTP proxies...") | |
starttime = time.time() | |
starttime2 = time.time() | |
proxyHttp = getProxyHttp() | |
endtime2 = time.time() - starttime2 | |
httpProxies = proxyHttp | |
httpProxies = list(set(httpProxies)) | |
endtime = time.time() - starttime | |
print("[SCRAPER] Total HTTP proxies: %i (%.2fsec)" % (len(httpProxies), endtime)) | |
print(" proxyhttp.net: %i (%.2fsec)" % (len(proxyHttp), endtime2)) | |
print("[SCRAPER] Done.") | |
return proxyHttp | |
else: | |
print("[SCRAPER] Beginning to scrape SOCKS proxies...") | |
starttime = time.time() | |
starttime2 = time.time() | |
socksListNet = getSocksList() | |
endtime2 = time.time() - starttime2 | |
socksProxies = socksListNet | |
socksProxies = socksListNet | |
socksProxies = list(set(socksProxies)) | |
endtime = time.time() - starttime | |
print("[SCRAPER] Total SOCKS proxies: %i (%.2fsec)" % (len(socksProxies), endtime)) | |
print(" sockslist.net: %i (%.2fsec)" % (len(socksListNet), endtime2)) | |
print("[SCRAPER] Done.") | |
return socksListNet | |
if __name__ == "__main__": | |
log.setLevel(logging.INFO); | |
if args.verbose: | |
log.setLevel(logging.DEBUG); | |
log.debug("Running in verbose mode (-v).") | |
if args.username and args.password: | |
log.info("Running with account testing. Going try to login into %s on each step.", args.username) | |
if args.proxy_file: | |
with open(args.proxy_file) as f: | |
proxies = f.read().splitlines() | |
else: | |
proxies = []; | |
for httpProxy in scrape(True): | |
proxies.append(httpProxy) | |
for socksProxy in scrape(False): | |
proxies.append("socks5://" + socksProxy) | |
proxies.append("socks4://" + socksProxy) | |
log.info( "Read %s proxies. Starting proxies availability test...", len(proxies) ) | |
# Initialize the Proxy Queue | |
proxy_queue = Queue() | |
for proxy in proxies: | |
proxy_queue.put(proxy) | |
count = 0 | |
for i in range(0, len(proxies)): | |
count += 1 | |
if args.username and args.password: | |
time.sleep(1) | |
log.debug("Launching thread %s", i) | |
t = Thread(target=check_single_proxy, | |
name='check_proxy', | |
args=(proxy_queue, args.timeout, working_proxies)) | |
t.daemon = True | |
t.start() | |
if count % 30 == 0: | |
time.sleep(5) | |
# Wait for the queue to finish | |
proxy_queue.join() | |
log.info( "Found %s working proxies! Writing to %s", len(working_proxies), args.output_file ) | |
save_to_file( working_proxies, args.output_file ) | |
sys.exit(0) |
Thanks for the code!
Would it be hard to add a function that checks the location (country code) of the servers and adds it the the output? Like this:
2016-09-09 ... [ US ][ socks5://123.123.123.42:1234 ] Ready for PokemonGo!
If i could do it myself, i wouldn´t bother you - sorry... but my Python knowledge is very limited. I understand what each part of the code does, but if i try to write(add) my own functions it usually just results in a lot of error messages :-)
It's possible to modify this script to test proxies with this url like pokemon go-map : https://pgorelease.nianticlabs.com/plfe/rpc. When I change the proxy_test_url it doesn't work, 0 working proxies.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Maybe a dumb question, but how to use it? Run it with python in the terminal or what?