Created
January 14, 2018 16:20
-
-
Save Sovetnikov/d52d089b26537f59a80e3aa8e3928381 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import re | |
import socket | |
import ssl | |
from urllib.parse import urlparse | |
from urllib.request import urlopen | |
import socks | |
from shared.enum import StrEnum | |
__all__ = ('ProxyProto', 'check_proxy', 'ProxyPublicPort', 'ProxyAnon', 'ProxyURL', 'ProxyPublicPorts') | |
class ProxyProto(StrEnum): | |
http = 1 | |
socks4 = 2 | |
socks5 = 3 | |
class ProxyAnon(StrEnum): | |
none = 1 | |
proxy_usage_seen = 2 | |
high = 3 | |
class ProxyPublicPort(StrEnum): | |
open = 1, | |
closed = 2, | |
ProxyPublicPorts = ProxyPublicPort | |
class ProxyURL(object): | |
__slots__ = ('scheme', 'hostname', 'port', 'username', 'password') | |
def __init__(self, scheme, hostname, port, username=None, password=None): | |
if isinstance(scheme, str): | |
scheme = ProxyProto[scheme.lower()] | |
if not isinstance(scheme, ProxyProto): | |
raise Exception('scheme должен быть ProxyProto или строкой с именем из ProxyProto') | |
self.scheme = scheme | |
self.hostname = hostname | |
self.port = port | |
self.username = username | |
self.password = password | |
@property | |
def scheme_as_string(self): | |
if isinstance(self.scheme, str): | |
return self.scheme | |
return self.proxy_proto_to_string.get(self.scheme) | |
proxy_proto_to_string = { | |
ProxyProto.http: 'http', | |
ProxyProto.socks4: 'socks4', | |
ProxyProto.socks5: 'socks5', | |
} | |
_ext_ip = None | |
_azenv = [ | |
'http://azenv.net/', | |
'https://www.proxy-listen.de/azenv.php', | |
'http://www.proxyfire.net/fastenv', | |
'http://proxyjudge.us/azenv.php', | |
'http://www.proxy-listen.de/azenv.php'] | |
def _test_open(host, port): | |
try: | |
ipaddr = socket.inet_aton(host) | |
ipaddr = host | |
except socket.error: | |
ipaddr = socket.gethostbyname(host) | |
if not ipaddr: | |
raise Exception('Не могу определить IP для {0}'.format(host)) | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
result = sock.connect_ex((ipaddr, int(port))) | |
return result == 0 | |
IPPattern = re.compile(r'(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)') | |
def _get_all_ip(page): | |
# TODO: add IPv6 support | |
return set(IPPattern.findall(page)) | |
_all_proto = ( | |
(socks.PROXY_TYPE_HTTP, ProxyProto.http), | |
(socks.PROXY_TYPE_SOCKS5, ProxyProto.socks5), | |
(socks.PROXY_TYPE_SOCKS4, ProxyProto.socks4), | |
) | |
_public_ports = (3128, 1080, 8080) | |
# _get_ip_url = 'https://icanhazip.com' | |
_get_ip_url = 'https://wtfismyip.com/text' | |
def check_proxy(host, port, username=None, password=None, proto=None): | |
""" | |
:rtype: (ProxyProto, ProxyAnon, ProxyPublicPort, external_ip) | |
""" | |
if not isinstance(port, int): | |
port = int(port) | |
try: | |
ipaddr = socket.inet_aton(host) | |
ipaddr = host | |
except socket.error: | |
ipaddr = socket.gethostbyname(host) | |
if not ipaddr: | |
raise Exception('Не могу определить IP для {0}'.format(host)) | |
context = ssl._create_unverified_context() | |
global _ext_ip | |
if not _ext_ip: | |
try: | |
_ext_ip = urlopen(_get_ip_url, timeout=10, context=context).read().decode() | |
_ext_ip = _ext_ip.strip() | |
except Exception as e: | |
raise Exception('Ошибка получения нашего IP через {0}: {1}'.format(_get_ip_url, str(e))) | |
if not _test_open(ipaddr, port): | |
return (None, None, None, None) | |
if proto: | |
proto_check = [x for x in _all_proto if x[1] == proto] | |
if not proto_check: | |
raise Exception('Неизвестный proto: {0}'.format(proto)) | |
else: | |
proto_check = _all_proto | |
errors = {} | |
for socks_type, proxy_proto in proto_check: | |
# Не удалется соединиться к http порту, видимо что-то в протоколе HTTP прокси ещё не поддерживается моим кодом | |
try_j = [x for x in _azenv if 'https' in x] | |
while True: | |
if not try_j: | |
raise Exception('Провайдеры заголовков не работают') | |
env_host = random.choice(try_j) | |
env_port = 443 if 'https' in env_host else 80 | |
if not _test_open(urlparse(env_host).netloc, env_port): | |
try_j.remove(env_host) | |
else: | |
break | |
connect_to = None | |
with socks.Proxy(socks_type, addr=host, port=port, username=username, password=password): | |
try: | |
connect_to = _get_ip_url | |
proxy_ext_ip = urlopen(connect_to, timeout=15, context=context).read().decode().strip() | |
# try: | |
connect_to = env_host | |
headers = urlopen(connect_to, timeout=15, context=context).read().decode().lower() | |
# except Exception as e: | |
# reraise(e, 'Ошибка соединения с {0}'.format(j)) | |
via = headers.count('VIA'.lower()) or headers.count('HTTP_X_FORWARDED_FOR'.lower()) | |
ips = _get_all_ip(headers) | |
if _ext_ip in ips: | |
lvl = ProxyAnon.none | |
elif via: | |
lvl = ProxyAnon.proxy_usage_seen | |
else: | |
lvl = ProxyAnon.high | |
public_closed = not any(_test_open(ipaddr, x) for x in _public_ports) | |
if public_closed: | |
public_port = ProxyPublicPort.closed | |
else: | |
public_port = ProxyPublicPort.open | |
return (proxy_proto, lvl, public_port, proxy_ext_ip) | |
except Exception as e: | |
errors[proxy_proto] = 'Ошибка соединения с {0}: {1}'.format(connect_to, str(e)) | |
if errors: | |
msg = ', '.join(['{0}: {1}'.format(proto.name.upper(), err) for proto, err in errors.items()]) | |
msg = 'Ошибка проверки прокси: {0}'.format(msg) | |
raise Exception(msg) | |
return (None, None, None, None) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment