Skip to content

Instantly share code, notes, and snippets.

@Sovetnikov
Created January 14, 2018 16:20
Show Gist options
  • Save Sovetnikov/d52d089b26537f59a80e3aa8e3928381 to your computer and use it in GitHub Desktop.
Save Sovetnikov/d52d089b26537f59a80e3aa8e3928381 to your computer and use it in GitHub Desktop.
import random
import re
import socket
import ssl
from urllib.parse import urlparse
from urllib.request import urlopen
import socks
from shared.enum import StrEnum
__all__ = ('ProxyProto', 'check_proxy', 'ProxyPublicPort', 'ProxyAnon', 'ProxyURL', 'ProxyPublicPorts')
class ProxyProto(StrEnum):
http = 1
socks4 = 2
socks5 = 3
class ProxyAnon(StrEnum):
none = 1
proxy_usage_seen = 2
high = 3
class ProxyPublicPort(StrEnum):
open = 1,
closed = 2,
ProxyPublicPorts = ProxyPublicPort
class ProxyURL(object):
__slots__ = ('scheme', 'hostname', 'port', 'username', 'password')
def __init__(self, scheme, hostname, port, username=None, password=None):
if isinstance(scheme, str):
scheme = ProxyProto[scheme.lower()]
if not isinstance(scheme, ProxyProto):
raise Exception('scheme должен быть ProxyProto или строкой с именем из ProxyProto')
self.scheme = scheme
self.hostname = hostname
self.port = port
self.username = username
self.password = password
@property
def scheme_as_string(self):
if isinstance(self.scheme, str):
return self.scheme
return self.proxy_proto_to_string.get(self.scheme)
proxy_proto_to_string = {
ProxyProto.http: 'http',
ProxyProto.socks4: 'socks4',
ProxyProto.socks5: 'socks5',
}
_ext_ip = None
_azenv = [
'http://azenv.net/',
'https://www.proxy-listen.de/azenv.php',
'http://www.proxyfire.net/fastenv',
'http://proxyjudge.us/azenv.php',
'http://www.proxy-listen.de/azenv.php']
def _test_open(host, port):
try:
ipaddr = socket.inet_aton(host)
ipaddr = host
except socket.error:
ipaddr = socket.gethostbyname(host)
if not ipaddr:
raise Exception('Не могу определить IP для {0}'.format(host))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((ipaddr, int(port)))
return result == 0
IPPattern = re.compile(r'(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)')
def _get_all_ip(page):
# TODO: add IPv6 support
return set(IPPattern.findall(page))
_all_proto = (
(socks.PROXY_TYPE_HTTP, ProxyProto.http),
(socks.PROXY_TYPE_SOCKS5, ProxyProto.socks5),
(socks.PROXY_TYPE_SOCKS4, ProxyProto.socks4),
)
_public_ports = (3128, 1080, 8080)
# _get_ip_url = 'https://icanhazip.com'
_get_ip_url = 'https://wtfismyip.com/text'
def check_proxy(host, port, username=None, password=None, proto=None):
"""
:rtype: (ProxyProto, ProxyAnon, ProxyPublicPort, external_ip)
"""
if not isinstance(port, int):
port = int(port)
try:
ipaddr = socket.inet_aton(host)
ipaddr = host
except socket.error:
ipaddr = socket.gethostbyname(host)
if not ipaddr:
raise Exception('Не могу определить IP для {0}'.format(host))
context = ssl._create_unverified_context()
global _ext_ip
if not _ext_ip:
try:
_ext_ip = urlopen(_get_ip_url, timeout=10, context=context).read().decode()
_ext_ip = _ext_ip.strip()
except Exception as e:
raise Exception('Ошибка получения нашего IP через {0}: {1}'.format(_get_ip_url, str(e)))
if not _test_open(ipaddr, port):
return (None, None, None, None)
if proto:
proto_check = [x for x in _all_proto if x[1] == proto]
if not proto_check:
raise Exception('Неизвестный proto: {0}'.format(proto))
else:
proto_check = _all_proto
errors = {}
for socks_type, proxy_proto in proto_check:
# Не удалется соединиться к http порту, видимо что-то в протоколе HTTP прокси ещё не поддерживается моим кодом
try_j = [x for x in _azenv if 'https' in x]
while True:
if not try_j:
raise Exception('Провайдеры заголовков не работают')
env_host = random.choice(try_j)
env_port = 443 if 'https' in env_host else 80
if not _test_open(urlparse(env_host).netloc, env_port):
try_j.remove(env_host)
else:
break
connect_to = None
with socks.Proxy(socks_type, addr=host, port=port, username=username, password=password):
try:
connect_to = _get_ip_url
proxy_ext_ip = urlopen(connect_to, timeout=15, context=context).read().decode().strip()
# try:
connect_to = env_host
headers = urlopen(connect_to, timeout=15, context=context).read().decode().lower()
# except Exception as e:
# reraise(e, 'Ошибка соединения с {0}'.format(j))
via = headers.count('VIA'.lower()) or headers.count('HTTP_X_FORWARDED_FOR'.lower())
ips = _get_all_ip(headers)
if _ext_ip in ips:
lvl = ProxyAnon.none
elif via:
lvl = ProxyAnon.proxy_usage_seen
else:
lvl = ProxyAnon.high
public_closed = not any(_test_open(ipaddr, x) for x in _public_ports)
if public_closed:
public_port = ProxyPublicPort.closed
else:
public_port = ProxyPublicPort.open
return (proxy_proto, lvl, public_port, proxy_ext_ip)
except Exception as e:
errors[proxy_proto] = 'Ошибка соединения с {0}: {1}'.format(connect_to, str(e))
if errors:
msg = ', '.join(['{0}: {1}'.format(proto.name.upper(), err) for proto, err in errors.items()])
msg = 'Ошибка проверки прокси: {0}'.format(msg)
raise Exception(msg)
return (None, None, None, None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment