Skip to content

Instantly share code, notes, and snippets.

@hatkidchan
Created May 16, 2022 23:24
Show Gist options
  • Save hatkidchan/195cb1073009b982d02a2393be3ce67d to your computer and use it in GitHub Desktop.
Save hatkidchan/195cb1073009b982d02a2393be3ce67d to your computer and use it in GitHub Desktop.
Proxy checker
#!/usr/bin/env python3
import asyncio
import string
import random
from typing import Union, TypeVar, Tuple
from httpx import AsyncClient
from httpx_socks import AsyncProxyTransport
Result = TypeVar('Result')
ProxyResult = Tuple[bool, Union[Exception, Result]]
def random_string(length: int = 8) -> str:
return str.join('', [
random.choice(string.ascii_lowercase + string.digits)
for _ in range(length)
])
async def get_proxies_proxylist(type_: str = 'http') -> list[str]:
async with AsyncClient() as client:
rq = await client.get('https://www.proxy-list.download/api/v1/get',
params={ 'type': type_ })
return [
'%s://%s' % (type_, proxy)
for proxy in rq.text.splitlines()
]
async def get_proxies_proxyscrape(type_: str = 'socks5') -> list[str]:
async with AsyncClient() as client:
rq = await client.get('https://api.proxyscrape.com/v2/',
params={
'request': 'getproxies',
'protocol': type_,
'timeout': 10000,
'country': 'all',
'ssl': 'yes',
'anonymity': 'all',
'simplified': 'true'
})
return [
'%s://%s' % (type_, proxy)
for proxy in rq.text.splitlines()
]
async def check_proxy(proxy: str,
prefix: str,
semaphore: asyncio.Semaphore
) -> ProxyResult[Tuple[str, str]]:
try:
async with semaphore:
transport = AsyncProxyTransport.from_url(proxy)
async with AsyncClient(transport=transport) as client:
req_fut = client.get('https://eth0.me', timeout=8.0)
rq = await asyncio.wait_for(req_fut, timeout=10.0)
print('\033[92m[%s] %s: %r\033[0m' % (prefix, proxy, rq.text),
flush=True)
return True, (proxy, rq.text.strip())
except Exception as e:
print('\033[91m[%s] %s: FAIL: %r\033[0m' % (prefix, proxy, e),
flush=True)
return False, e
async def main():
proxies = (await get_proxies_proxyscrape('socks4')
+ await get_proxies_proxylist('socks5')
+ await get_proxies_proxylist('http'))
n_proxies = len(proxies)
semaphore = asyncio.Semaphore(64)
tasks = [
check_proxy(proxy, '%2d/%2d' % (i + 1, n_proxies), semaphore)
for i, proxy in enumerate(proxies)
]
working_proxies = []
for success, result in await asyncio.gather(*tasks):
if success:
working_proxies.append(result)
print('-' * 32)
for proxy, address in working_proxies:
print('\033[%dm%s\033[0m' % (
32 if address in proxy else 33,
proxy
))
if not working_proxies:
print('\033[31mNothing found.\033[0m')
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment