Created
May 16, 2022 23:24
-
-
Save hatkidchan/195cb1073009b982d02a2393be3ce67d to your computer and use it in GitHub Desktop.
Proxy checker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio | |
import string | |
import random | |
from typing import Union, TypeVar, Tuple | |
from httpx import AsyncClient | |
from httpx_socks import AsyncProxyTransport | |
Result = TypeVar('Result') | |
ProxyResult = Tuple[bool, Union[Exception, Result]] | |
def random_string(length: int = 8) -> str: | |
return str.join('', [ | |
random.choice(string.ascii_lowercase + string.digits) | |
for _ in range(length) | |
]) | |
async def get_proxies_proxylist(type_: str = 'http') -> list[str]: | |
async with AsyncClient() as client: | |
rq = await client.get('https://www.proxy-list.download/api/v1/get', | |
params={ 'type': type_ }) | |
return [ | |
'%s://%s' % (type_, proxy) | |
for proxy in rq.text.splitlines() | |
] | |
async def get_proxies_proxyscrape(type_: str = 'socks5') -> list[str]: | |
async with AsyncClient() as client: | |
rq = await client.get('https://api.proxyscrape.com/v2/', | |
params={ | |
'request': 'getproxies', | |
'protocol': type_, | |
'timeout': 10000, | |
'country': 'all', | |
'ssl': 'yes', | |
'anonymity': 'all', | |
'simplified': 'true' | |
}) | |
return [ | |
'%s://%s' % (type_, proxy) | |
for proxy in rq.text.splitlines() | |
] | |
async def check_proxy(proxy: str, | |
prefix: str, | |
semaphore: asyncio.Semaphore | |
) -> ProxyResult[Tuple[str, str]]: | |
try: | |
async with semaphore: | |
transport = AsyncProxyTransport.from_url(proxy) | |
async with AsyncClient(transport=transport) as client: | |
req_fut = client.get('https://eth0.me', timeout=8.0) | |
rq = await asyncio.wait_for(req_fut, timeout=10.0) | |
print('\033[92m[%s] %s: %r\033[0m' % (prefix, proxy, rq.text), | |
flush=True) | |
return True, (proxy, rq.text.strip()) | |
except Exception as e: | |
print('\033[91m[%s] %s: FAIL: %r\033[0m' % (prefix, proxy, e), | |
flush=True) | |
return False, e | |
async def main(): | |
proxies = (await get_proxies_proxyscrape('socks4') | |
+ await get_proxies_proxylist('socks5') | |
+ await get_proxies_proxylist('http')) | |
n_proxies = len(proxies) | |
semaphore = asyncio.Semaphore(64) | |
tasks = [ | |
check_proxy(proxy, '%2d/%2d' % (i + 1, n_proxies), semaphore) | |
for i, proxy in enumerate(proxies) | |
] | |
working_proxies = [] | |
for success, result in await asyncio.gather(*tasks): | |
if success: | |
working_proxies.append(result) | |
print('-' * 32) | |
for proxy, address in working_proxies: | |
print('\033[%dm%s\033[0m' % ( | |
32 if address in proxy else 33, | |
proxy | |
)) | |
if not working_proxies: | |
print('\033[31mNothing found.\033[0m') | |
if __name__ == '__main__': | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment