Skip to content

Instantly share code, notes, and snippets.

@rwjuk
Created March 11, 2021 17:40
Show Gist options
  • Save rwjuk/c6a11638400e67f12647499083c25fa2 to your computer and use it in GitHub Desktop.
Save rwjuk/c6a11638400e67f12647499083c25fa2 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import argparse
import ipaddress
import socket
import idna
import toolforge
from datetime import datetime
from OpenSSL import SSL
from cryptography import x509
from cryptography.x509.oid import NameOID
proxy_dns_mappings = [
("surfshark.com", "Surfshark"),
("ipvanish.com", "IPVanish"),
("wlvpn.com", "White Label VPN"),
("vpnunlimitedapp.com", "VPN Unlimited"),
("softether.net", "SoftEther"),
("privateinternetaccess.com", "PIA"),
("airvpn.org", "AirVPN"),
("rapidvpn.com", "RapidVPN"),
("purevpn.com", "PureVPN")
]
parser = argparse.ArgumentParser(description="Dump orphaned talk pages to a pickle file")
parser.add_argument("iprange", help="IP range to check")
parser.add_argument("--verbose", action="store_true")
parser.add_argument("--show-unknown-sans", action="store_true")
parser.add_argument("--wikitext", action="store_true")
args = parser.parse_args()
ip_range = args.iprange
verbose = args.verbose
show_unknown_sans = args.show_unknown_sans
output_wikitext = args.wikitext
def get_certificate(hostname, port):
hostname_idna = idna.encode(hostname)
sock = socket.socket()
sock.settimeout(0.5)
sock.connect((hostname, port))
sock.setblocking(1)
peername = sock.getpeername()
ctx = SSL.Context(SSL.SSLv23_METHOD) # most compatible
ctx.check_hostname = False
ctx.verify_mode = SSL.VERIFY_NONE
sock_ssl = SSL.Connection(ctx, sock)
sock_ssl.set_connect_state()
sock_ssl.set_tlsext_host_name(hostname_idna)
sock_ssl.do_handshake()
cert = sock_ssl.get_peer_certificate()
crypto_cert = cert.to_cryptography()
sock_ssl.close()
sock.close()
return crypto_cert
def get_alt_names(cert):
try:
ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
return ext.value.get_values_for_type(x509.DNSName)
except x509.ExtensionNotFound:
return []
def get_common_name(cert):
try:
names = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
return names[0].value
except x509.ExtensionNotFound:
return ""
except IndexError:
return ""
def ip_is_blocked(ip):
conn = toolforge.connect("enwiki")
cur = conn.cursor()
cur.execute("select distinct * from ipblocks where ipb_address=%s and (ipb_expiry='infinity' or ipb_expiry>NOW())", str(ip))
return (len(cur.fetchall()) > 0)
def verbose_print(output):
if verbose:
print(output)
if ("/32" in ip_range) or ("/" not in ip_range):
ip_addresses = [ip_range.replace("/32","")]
else:
ip_network = ipaddress.ip_network(ip_range, strict=False)
ip_addresses = list(ip_network.hosts())
def check_for_proxy(host):
try:
cert = get_certificate(str(host), 443)
except socket.timeout:
if verbose:
return "{} - socket timeout".format(host)
else:
return
except socket.error as e:
if verbose:
return "{} = socket error: {}".format(host, e)
else:
return
except SSL.WantReadError as e:
if verbose:
return "{} - SSL read error: {}".format(host, e)
else:
return
except SSL.Error as e:
if verbose:
return "{} - SSL error: {}".format(host, e)
else:
return
subject_name_list = get_alt_names(cert) + [get_common_name(cert)]
if subject_name_list == [""]: # Something must have gone wrong with getting the cert/extracting the SN/SANs from it
if verbose:
return "{} - no SANs could be found for certificate".format(host)
else:
return
for entry in subject_name_list:
for alt_name,proxy in proxy_dns_mappings:
if alt_name in entry:
blocked = ip_is_blocked(host)
if output_wikitext:
return "{{{{proxyip4|{}}}}} - matches {}. {}.".format(host, proxy, ("Blocked." if blocked else "Not blocked"))
else:
return "{} - matches {}. Blocked: {}".format(host, proxy, blocked)
if show_unknown_sans:
return "{} - SAN list: {}".format(host, subject_name_list)
elif verbose:
return "{} - does not match any known VPN/proxy DNS name".format(host)
else:
return
import concurrent.futures
workers = min(len(ip_addresses), 8)
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as e:
for proxy_check in e.map(check_for_proxy, ip_addresses):
if proxy_check is not None:
print(proxy_check)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment