-
-
Save rwjuk/c6a11638400e67f12647499083c25fa2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import ipaddress | |
import socket | |
import idna | |
import toolforge | |
from datetime import datetime | |
from OpenSSL import SSL | |
from cryptography import x509 | |
from cryptography.x509.oid import NameOID | |
proxy_dns_mappings = [ | |
("surfshark.com", "Surfshark"), | |
("ipvanish.com", "IPVanish"), | |
("wlvpn.com", "White Label VPN"), | |
("vpnunlimitedapp.com", "VPN Unlimited"), | |
("softether.net", "SoftEther"), | |
("privateinternetaccess.com", "PIA"), | |
("airvpn.org", "AirVPN"), | |
("rapidvpn.com", "RapidVPN"), | |
("purevpn.com", "PureVPN") | |
] | |
parser = argparse.ArgumentParser(description="Dump orphaned talk pages to a pickle file") | |
parser.add_argument("iprange", help="IP range to check") | |
parser.add_argument("--verbose", action="store_true") | |
parser.add_argument("--show-unknown-sans", action="store_true") | |
parser.add_argument("--wikitext", action="store_true") | |
args = parser.parse_args() | |
ip_range = args.iprange | |
verbose = args.verbose | |
show_unknown_sans = args.show_unknown_sans | |
output_wikitext = args.wikitext | |
def get_certificate(hostname, port): | |
hostname_idna = idna.encode(hostname) | |
sock = socket.socket() | |
sock.settimeout(0.5) | |
sock.connect((hostname, port)) | |
sock.setblocking(1) | |
peername = sock.getpeername() | |
ctx = SSL.Context(SSL.SSLv23_METHOD) # most compatible | |
ctx.check_hostname = False | |
ctx.verify_mode = SSL.VERIFY_NONE | |
sock_ssl = SSL.Connection(ctx, sock) | |
sock_ssl.set_connect_state() | |
sock_ssl.set_tlsext_host_name(hostname_idna) | |
sock_ssl.do_handshake() | |
cert = sock_ssl.get_peer_certificate() | |
crypto_cert = cert.to_cryptography() | |
sock_ssl.close() | |
sock.close() | |
return crypto_cert | |
def get_alt_names(cert): | |
try: | |
ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName) | |
return ext.value.get_values_for_type(x509.DNSName) | |
except x509.ExtensionNotFound: | |
return [] | |
def get_common_name(cert): | |
try: | |
names = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME) | |
return names[0].value | |
except x509.ExtensionNotFound: | |
return "" | |
except IndexError: | |
return "" | |
def ip_is_blocked(ip): | |
conn = toolforge.connect("enwiki") | |
cur = conn.cursor() | |
cur.execute("select distinct * from ipblocks where ipb_address=%s and (ipb_expiry='infinity' or ipb_expiry>NOW())", str(ip)) | |
return (len(cur.fetchall()) > 0) | |
def verbose_print(output): | |
if verbose: | |
print(output) | |
if ("/32" in ip_range) or ("/" not in ip_range): | |
ip_addresses = [ip_range.replace("/32","")] | |
else: | |
ip_network = ipaddress.ip_network(ip_range, strict=False) | |
ip_addresses = list(ip_network.hosts()) | |
def check_for_proxy(host): | |
try: | |
cert = get_certificate(str(host), 443) | |
except socket.timeout: | |
if verbose: | |
return "{} - socket timeout".format(host) | |
else: | |
return | |
except socket.error as e: | |
if verbose: | |
return "{} = socket error: {}".format(host, e) | |
else: | |
return | |
except SSL.WantReadError as e: | |
if verbose: | |
return "{} - SSL read error: {}".format(host, e) | |
else: | |
return | |
except SSL.Error as e: | |
if verbose: | |
return "{} - SSL error: {}".format(host, e) | |
else: | |
return | |
subject_name_list = get_alt_names(cert) + [get_common_name(cert)] | |
if subject_name_list == [""]: # Something must have gone wrong with getting the cert/extracting the SN/SANs from it | |
if verbose: | |
return "{} - no SANs could be found for certificate".format(host) | |
else: | |
return | |
for entry in subject_name_list: | |
for alt_name,proxy in proxy_dns_mappings: | |
if alt_name in entry: | |
blocked = ip_is_blocked(host) | |
if output_wikitext: | |
return "{{{{proxyip4|{}}}}} - matches {}. {}.".format(host, proxy, ("Blocked." if blocked else "Not blocked")) | |
else: | |
return "{} - matches {}. Blocked: {}".format(host, proxy, blocked) | |
if show_unknown_sans: | |
return "{} - SAN list: {}".format(host, subject_name_list) | |
elif verbose: | |
return "{} - does not match any known VPN/proxy DNS name".format(host) | |
else: | |
return | |
import concurrent.futures | |
workers = min(len(ip_addresses), 8) | |
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as e: | |
for proxy_check in e.map(check_for_proxy, ip_addresses): | |
if proxy_check is not None: | |
print(proxy_check) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment