Skip to content

Instantly share code, notes, and snippets.

@deeso
Last active July 28, 2018 09:48
Show Gist options
  • Save deeso/d64e91412388f7f2428a926e0b3c8ef5 to your computer and use it in GitHub Desktop.
Save deeso/d64e91412388f7f2428a926e0b3c8ef5 to your computer and use it in GitHub Desktop.
enumerate the top 1m sites SSL certificates.
import traceback
from datetime import datetime
import ssl
import socket
from OpenSSL import SSL
from multiprocessing import Pool
CN_S = 'commonName'
O_S = 'organizationName'
C_S = 'countryName'
ST_S = 'State'
L_S = 'localityName'
last_tb = None
def connect(version, hostname, port):
cert = None
s = None
try:
ctx = SSL.Context(version)
s = socket.socket()
s.connect((hostname, port))
conn = SSL.Connection(ctx, s)
conn.set_connect_state()
conn.do_handshake()
cert = conn.get_peer_certificate()
conn.close()
except:
if s is not None:
s.close()
return cert
def get_certinfo(ssl_cert):
cert = ssl_cert
cert_info = {}
if len(cert_info) == 0:
return cert_info
cert_info['notAfter'] = cert['notAfter']
cert_info['notBefore'] = cert['notBefore']
cert_info['serial_number'] = cert['serialNumber']
cert_info['issuer'] = dict([i[0] for i in cert['issuer']])
cert_info['subject'] = dict([i[0] for i in cert['subject']])
cert_info['subjectAltName'] = [i[1] for i in cert['subjectAltName'] if len(i) > 0]
cert_info['version'] = cert['version']
return cert_info
def get_ssl_info2(hostname):
cert_info = get_certinfo(host_cert2(hostname))
try:
if len(cert_info) == 0:
return get_ssl_wwwinfo(hostname)
except:
traceback.print_exc()
return hostname, cert_info
def get_ssl_wwwinfo2(hostname):
cert_info = get_certinfo(host_cert2('www.'+hostname))
if len(cert_info) == 0:
return get_ssl_info(hostname)
return hostname, cert_info
def host_cert2(hostname, port=443):
try:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_OPTIONAL
s = ctx.wrap_socket(socket.socket(), server_hostname=hostname)
s.connect((hostname, 443))
return s.getpeercert()
except:
pass
return {}
def host_cert(hostname, port=443):
versions = [SSL.TLSv1_2_METHOD, SSL.TLSv1_1_METHOD,
SSL.TLSv1_METHOD, SSL.SSLv3_METHOD,
SSL.SSLv23_METHOD, SSL.SSLv2_METHOD]
version_str = ['tls1.2', 'tls1.1',
'tls1', 'ssl3',
'ssl23', 'ssl2']
for v, vs in zip(versions, version_str):
try:
cert = connect(v, hostname, port)
if cert is not None:
return vs, cert
except:
pass
return None, None
def extract_altsubjectname(data):
d = data.replace(b'\x82', b' ').split(b' ')
r = []
for i in d:
if len(i) < 3:
continue
n = i[1:]
try:
r.append(n.decode('utf8'))
except:
r.append(n)
# p = 2
# while p < len(d):
# p += 1
# if p > len(d):
# break
# l = d[p]
# p += 1
# if p > len(d):
# break
# n = d[p:p+l]
# p += l
# if l == 0:
# continue
# if len(n) == 0:
# continue
# try:
# r.append(n.decode('utf8'))
# except:
# r.append(n)
return r
def cleanup(d):
r = {}
items = [(k, v) for k, v in d.items()]
for k, v in items:
if isinstance(v, dict):
r[k] = cleanup_dict(d[k])
elif isinstance(v, list):
r[k] = cleanup_list(d[k])
elif isinstance(v, bytes):
r[k] = cleanup_str(d[k])
return r
def cleanup_str(v):
t = v.decode('utf8') if isinstance(v, bytes) else v
return t
def cleanup_dict(v):
r = {}
for k in v:
try:
r[k] = v[k].decode('utf8') if isinstance(v[k], bytes) else v[k]
continue
except:
pass
try:
r[k] = v[k].decode('latin-1') if isinstance(v[k], bytes) else v[k]
except:
raise
return r
def cleanup_list(v):
r = []
for v in v:
try:
r.append(v.decode('utf8') if isinstance(v, bytes) else v)
continue
except:
pass
try:
r.append(v.decode('latin-1') if isinstance(v, bytes) else v)
except:
raise
return r
def convert_issuer(issuer):
new_issuer = {}
try:
new_issuer[O_S] = issuer[b'O'].decode('utf8')
except:
new_issuer[O_S] = issuer.get(b'O', '')
try:
new_issuer[C_S] = issuer[b'C'].decode('utf8')
except:
new_issuer[C_S] = issuer.get(b'C', '')
try:
new_issuer[CN_S] = issuer[b'CN'].decode('utf8')
except:
new_issuer[CN_S] = issuer.get(b'CN', '')
return new_issuer
def convert_subject(subject):
new_subject = convert_issuer(subject)
try:
new_subject[L_S] = subject[b'L'].decode('utf8')
except:
new_subject[L_S] = subject.get(b'L', '')
try:
new_subject[ST_S] = subject[b'L'].decode('utf8')
except:
new_subject[ST_S] = subject.get(b'L', '')
return new_subject
def remap_pyopenssl_to_mine(hostname):
tlsv, cert = cert_issuer_on(hostname)
cert_info = {}
if cert is None:
return cert_info
nbd = datetime.strptime(cert.get_notAfter().decode('utf8'), "%Y%m%d%H%M%SZ")
nad = datetime.strptime(cert.get_notAfter().decode('utf8'), "%Y%m%d%H%M%SZ")
cert_info['notBefore'] = nbd.strftime('%b %d %H:%M:%S %Y GMT')
cert_info['notAfter'] = nad.strftime('%b %d %H:%M:%S %Y GMT')
cert_info['serial_number'] = str(cert.get_serial_number())
cert_info['issuer'] = convert_issuer(dict(cert.get_issuer().get_components()))
cert_info['subject'] = convert_subject(dict(cert.get_subject().get_components()))
count = cert.get_extension_count()
exts = dict([(i, cert.get_extension(i)) for i in range(0, count)])
cert_info['subjectAltName'] = []
for v in exts.values():
if v.get_short_name().decode('utf8') == 'subjectAltName':
d = v.get_data()
if d is None:
break
cert_info['subjectAltName'] = extract_altsubjectname(d)
cert_info['version'] = cert.get_version()
return cert_info
def cert_issuer_on(hostname, port=443):
tlsv, cert = host_cert(hostname, port)
if cert is None:
return tlsv, None
return tlsv, cert
def get_ssl_info(hostname):
cert_info = remap_pyopenssl_to_mine(hostname)
return hostname, cert_info
def get_ssl_wwwinfo(hostname):
cert_info = remap_pyopenssl_to_mine('www'+hostname)
if cert_info is None or len(cert_info) == 0:
return get_ssl_info(hostname)
return hostname, cert_info
T1M = '/home/dso/top-1m.csv'
top1m = [i.split(',')[1].strip() for i in open(T1M).readlines() if i.find('.') > 0]
T1M = '/home/dso/top-1m-TLD.csv'
top1m = top1m + [i.split(',')[1].strip() for i in open(T1M).readlines() if i.find('.') > 0]
top1m_certs = {}
top1m_issuers = {}
top1m_issuers_cn = {}
top1m_issuers_ou = {}
top1m_issuers_o = {}
top1m_issuers_c = {}
top1m_failed = set()
pool = Pool(processes=500)
start_time = datetime.now()
futures = pool.imap_unordered(get_ssl_wwwinfo2, top1m)
pos = 0
non_empty = {}
for r in futures:
if len(r) != 2:
break
hostname, cert = r
pos += 1
if pos % 100000 == 0:
print ("Completed %d hosts (%s)" % (pos, hostname))
if cert is None or len(cert) == 0:
cur_time = datetime.now()
print ("Failed @ %d (%s) (Elapsed time: %s)" % (pos, hostname, (cur_time-start_time )))
top1m_failed.add(hostname)
continue
issuer = cert['issuer']
top1m_certs[hostname] = cert
if len(cert) > 0:
d = cleanup(cert)
non_empty[hostname] = cleanup(cert)
top1m_issuers[hostname] = issuer
cn_s = issuer.get(CN_S, 'None')
o_s = issuer.get(O_S, 'None')
c_s = issuer.get(C_S, 'None')
cn = top1m_issuers_cn[cn_s] if cn_s in top1m_issuers_cn else 0
o = top1m_issuers_o[o_s] if o_s in top1m_issuers_o else 0
c = top1m_issuers_c[c_s] if c_s in top1m_issuers_c else 0
top1m_issuers_cn[cn_s] = cn + 1
top1m_issuers_o[o_s] = o + 1
top1m_issuers_c[c_s] = c + 1
end_time = datetime.now()
print ("Start time: "+ str(start_time))
print ("End time: "+ str(end_time))
print ("Elapsed time: "+ str(end_time-start_time))
pool.terminate()
pool.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment