Last active
July 28, 2018 09:48
-
-
Save deeso/d64e91412388f7f2428a926e0b3c8ef5 to your computer and use it in GitHub Desktop.
enumerate the top 1m sites SSL certificates.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import traceback | |
from datetime import datetime | |
import ssl | |
import socket | |
from OpenSSL import SSL | |
from multiprocessing import Pool | |
CN_S = 'commonName' | |
O_S = 'organizationName' | |
C_S = 'countryName' | |
ST_S = 'State' | |
L_S = 'localityName' | |
last_tb = None | |
def connect(version, hostname, port): | |
cert = None | |
s = None | |
try: | |
ctx = SSL.Context(version) | |
s = socket.socket() | |
s.connect((hostname, port)) | |
conn = SSL.Connection(ctx, s) | |
conn.set_connect_state() | |
conn.do_handshake() | |
cert = conn.get_peer_certificate() | |
conn.close() | |
except: | |
if s is not None: | |
s.close() | |
return cert | |
def get_certinfo(ssl_cert): | |
cert = ssl_cert | |
cert_info = {} | |
if len(cert_info) == 0: | |
return cert_info | |
cert_info['notAfter'] = cert['notAfter'] | |
cert_info['notBefore'] = cert['notBefore'] | |
cert_info['serial_number'] = cert['serialNumber'] | |
cert_info['issuer'] = dict([i[0] for i in cert['issuer']]) | |
cert_info['subject'] = dict([i[0] for i in cert['subject']]) | |
cert_info['subjectAltName'] = [i[1] for i in cert['subjectAltName'] if len(i) > 0] | |
cert_info['version'] = cert['version'] | |
return cert_info | |
def get_ssl_info2(hostname): | |
cert_info = get_certinfo(host_cert2(hostname)) | |
try: | |
if len(cert_info) == 0: | |
return get_ssl_wwwinfo(hostname) | |
except: | |
traceback.print_exc() | |
return hostname, cert_info | |
def get_ssl_wwwinfo2(hostname): | |
cert_info = get_certinfo(host_cert2('www.'+hostname)) | |
if len(cert_info) == 0: | |
return get_ssl_info(hostname) | |
return hostname, cert_info | |
def host_cert2(hostname, port=443): | |
try: | |
ctx = ssl.create_default_context() | |
ctx.check_hostname = False | |
ctx.verify_mode = ssl.CERT_OPTIONAL | |
s = ctx.wrap_socket(socket.socket(), server_hostname=hostname) | |
s.connect((hostname, 443)) | |
return s.getpeercert() | |
except: | |
pass | |
return {} | |
def host_cert(hostname, port=443): | |
versions = [SSL.TLSv1_2_METHOD, SSL.TLSv1_1_METHOD, | |
SSL.TLSv1_METHOD, SSL.SSLv3_METHOD, | |
SSL.SSLv23_METHOD, SSL.SSLv2_METHOD] | |
version_str = ['tls1.2', 'tls1.1', | |
'tls1', 'ssl3', | |
'ssl23', 'ssl2'] | |
for v, vs in zip(versions, version_str): | |
try: | |
cert = connect(v, hostname, port) | |
if cert is not None: | |
return vs, cert | |
except: | |
pass | |
return None, None | |
def extract_altsubjectname(data): | |
d = data.replace(b'\x82', b' ').split(b' ') | |
r = [] | |
for i in d: | |
if len(i) < 3: | |
continue | |
n = i[1:] | |
try: | |
r.append(n.decode('utf8')) | |
except: | |
r.append(n) | |
# p = 2 | |
# while p < len(d): | |
# p += 1 | |
# if p > len(d): | |
# break | |
# l = d[p] | |
# p += 1 | |
# if p > len(d): | |
# break | |
# n = d[p:p+l] | |
# p += l | |
# if l == 0: | |
# continue | |
# if len(n) == 0: | |
# continue | |
# try: | |
# r.append(n.decode('utf8')) | |
# except: | |
# r.append(n) | |
return r | |
def cleanup(d): | |
r = {} | |
items = [(k, v) for k, v in d.items()] | |
for k, v in items: | |
if isinstance(v, dict): | |
r[k] = cleanup_dict(d[k]) | |
elif isinstance(v, list): | |
r[k] = cleanup_list(d[k]) | |
elif isinstance(v, bytes): | |
r[k] = cleanup_str(d[k]) | |
return r | |
def cleanup_str(v): | |
t = v.decode('utf8') if isinstance(v, bytes) else v | |
return t | |
def cleanup_dict(v): | |
r = {} | |
for k in v: | |
try: | |
r[k] = v[k].decode('utf8') if isinstance(v[k], bytes) else v[k] | |
continue | |
except: | |
pass | |
try: | |
r[k] = v[k].decode('latin-1') if isinstance(v[k], bytes) else v[k] | |
except: | |
raise | |
return r | |
def cleanup_list(v): | |
r = [] | |
for v in v: | |
try: | |
r.append(v.decode('utf8') if isinstance(v, bytes) else v) | |
continue | |
except: | |
pass | |
try: | |
r.append(v.decode('latin-1') if isinstance(v, bytes) else v) | |
except: | |
raise | |
return r | |
def convert_issuer(issuer): | |
new_issuer = {} | |
try: | |
new_issuer[O_S] = issuer[b'O'].decode('utf8') | |
except: | |
new_issuer[O_S] = issuer.get(b'O', '') | |
try: | |
new_issuer[C_S] = issuer[b'C'].decode('utf8') | |
except: | |
new_issuer[C_S] = issuer.get(b'C', '') | |
try: | |
new_issuer[CN_S] = issuer[b'CN'].decode('utf8') | |
except: | |
new_issuer[CN_S] = issuer.get(b'CN', '') | |
return new_issuer | |
def convert_subject(subject): | |
new_subject = convert_issuer(subject) | |
try: | |
new_subject[L_S] = subject[b'L'].decode('utf8') | |
except: | |
new_subject[L_S] = subject.get(b'L', '') | |
try: | |
new_subject[ST_S] = subject[b'L'].decode('utf8') | |
except: | |
new_subject[ST_S] = subject.get(b'L', '') | |
return new_subject | |
def remap_pyopenssl_to_mine(hostname): | |
tlsv, cert = cert_issuer_on(hostname) | |
cert_info = {} | |
if cert is None: | |
return cert_info | |
nbd = datetime.strptime(cert.get_notAfter().decode('utf8'), "%Y%m%d%H%M%SZ") | |
nad = datetime.strptime(cert.get_notAfter().decode('utf8'), "%Y%m%d%H%M%SZ") | |
cert_info['notBefore'] = nbd.strftime('%b %d %H:%M:%S %Y GMT') | |
cert_info['notAfter'] = nad.strftime('%b %d %H:%M:%S %Y GMT') | |
cert_info['serial_number'] = str(cert.get_serial_number()) | |
cert_info['issuer'] = convert_issuer(dict(cert.get_issuer().get_components())) | |
cert_info['subject'] = convert_subject(dict(cert.get_subject().get_components())) | |
count = cert.get_extension_count() | |
exts = dict([(i, cert.get_extension(i)) for i in range(0, count)]) | |
cert_info['subjectAltName'] = [] | |
for v in exts.values(): | |
if v.get_short_name().decode('utf8') == 'subjectAltName': | |
d = v.get_data() | |
if d is None: | |
break | |
cert_info['subjectAltName'] = extract_altsubjectname(d) | |
cert_info['version'] = cert.get_version() | |
return cert_info | |
def cert_issuer_on(hostname, port=443): | |
tlsv, cert = host_cert(hostname, port) | |
if cert is None: | |
return tlsv, None | |
return tlsv, cert | |
def get_ssl_info(hostname): | |
cert_info = remap_pyopenssl_to_mine(hostname) | |
return hostname, cert_info | |
def get_ssl_wwwinfo(hostname): | |
cert_info = remap_pyopenssl_to_mine('www'+hostname) | |
if cert_info is None or len(cert_info) == 0: | |
return get_ssl_info(hostname) | |
return hostname, cert_info | |
T1M = '/home/dso/top-1m.csv' | |
top1m = [i.split(',')[1].strip() for i in open(T1M).readlines() if i.find('.') > 0] | |
T1M = '/home/dso/top-1m-TLD.csv' | |
top1m = top1m + [i.split(',')[1].strip() for i in open(T1M).readlines() if i.find('.') > 0] | |
top1m_certs = {} | |
top1m_issuers = {} | |
top1m_issuers_cn = {} | |
top1m_issuers_ou = {} | |
top1m_issuers_o = {} | |
top1m_issuers_c = {} | |
top1m_failed = set() | |
pool = Pool(processes=500) | |
start_time = datetime.now() | |
futures = pool.imap_unordered(get_ssl_wwwinfo2, top1m) | |
pos = 0 | |
non_empty = {} | |
for r in futures: | |
if len(r) != 2: | |
break | |
hostname, cert = r | |
pos += 1 | |
if pos % 100000 == 0: | |
print ("Completed %d hosts (%s)" % (pos, hostname)) | |
if cert is None or len(cert) == 0: | |
cur_time = datetime.now() | |
print ("Failed @ %d (%s) (Elapsed time: %s)" % (pos, hostname, (cur_time-start_time ))) | |
top1m_failed.add(hostname) | |
continue | |
issuer = cert['issuer'] | |
top1m_certs[hostname] = cert | |
if len(cert) > 0: | |
d = cleanup(cert) | |
non_empty[hostname] = cleanup(cert) | |
top1m_issuers[hostname] = issuer | |
cn_s = issuer.get(CN_S, 'None') | |
o_s = issuer.get(O_S, 'None') | |
c_s = issuer.get(C_S, 'None') | |
cn = top1m_issuers_cn[cn_s] if cn_s in top1m_issuers_cn else 0 | |
o = top1m_issuers_o[o_s] if o_s in top1m_issuers_o else 0 | |
c = top1m_issuers_c[c_s] if c_s in top1m_issuers_c else 0 | |
top1m_issuers_cn[cn_s] = cn + 1 | |
top1m_issuers_o[o_s] = o + 1 | |
top1m_issuers_c[c_s] = c + 1 | |
end_time = datetime.now() | |
print ("Start time: "+ str(start_time)) | |
print ("End time: "+ str(end_time)) | |
print ("Elapsed time: "+ str(end_time-start_time)) | |
pool.terminate() | |
pool.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment