Skip to content

Instantly share code, notes, and snippets.

@alt-glitch
Created November 17, 2022 09:30
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save alt-glitch/485c93a1648d461d38e49efe80d6e886 to your computer and use it in GitHub Desktop.
Save alt-glitch/485c93a1648d461d38e49efe80d6e886 to your computer and use it in GitHub Desktop.
Sample script to query crt.sh for certificate transparency log data.
import psycopg2
class CertificateTransparencyLog:
def __init__(self, base) -> None:
self.conn = psycopg2.connect(
host="crt.sh",
database="certwatch",
user="guest",
port="5432"
)
self.conn.autocommit = True
self.base = base
self.CAIDs = []
query = """
WITH ci AS (
SELECT min(sub.CERTIFICATE_ID) ID,
min(sub.ISSUER_CA_ID) ISSUER_CA_ID,
array_agg(DISTINCT sub.NAME_VALUE) NAME_VALUES,
x509_commonName(sub.CERTIFICATE) COMMON_NAME,
x509_notBefore(sub.CERTIFICATE) NOT_BEFORE,
x509_notAfter(sub.CERTIFICATE) NOT_AFTER,
encode(x509_serialNumber(sub.CERTIFICATE), 'hex') SERIAL_NUMBER
FROM (SELECT *
FROM certificate_and_identities cai
WHERE plainto_tsquery('certwatch', %s) @@ identities(cai.CERTIFICATE)
AND cai.NAME_VALUE ILIKE ('%%' || %s || '%%')
AND NOT EXISTS (
SELECT 1
FROM certificate c2
WHERE x509_serialNumber(c2.CERTIFICATE) = x509_serialNumber(cai.CERTIFICATE)
AND c2.ISSUER_CA_ID = cai.ISSUER_CA_ID
AND c2.ID < cai.CERTIFICATE_ID
AND x509_tbscert_strip_ct_ext(c2.CERTIFICATE) = x509_tbscert_strip_ct_ext(cai.CERTIFICATE)
LIMIT 1
)
LIMIT 10000
) sub
GROUP BY sub.CERTIFICATE
)
SELECT ci.ISSUER_CA_ID,
ca.NAME ISSUER_NAME,
ci.COMMON_NAME,
array_to_string(ci.NAME_VALUES, chr(10)) NAME_VALUE,
ci.ID ID,
le.ENTRY_TIMESTAMP,
ci.NOT_BEFORE,
ci.NOT_AFTER,
ci.SERIAL_NUMBER
FROM ci
LEFT JOIN LATERAL (
SELECT min(ctle.ENTRY_TIMESTAMP) ENTRY_TIMESTAMP
FROM ct_log_entry ctle
WHERE ctle.CERTIFICATE_ID = ci.ID
) le ON TRUE,
ca
WHERE ci.ISSUER_CA_ID = ca.ID
ORDER BY le.ENTRY_TIMESTAMP DESC NULLS LAST;
"""
cur = self.conn.cursor()
cur.execute(query, (self.base, self.base))
rows = cur.fetchall()
self.baseCertData = rows
self.numOfLeafCerts = len(rows)
cur.close()
def getNumOfLeafCerts(self):
return self.numOfLeafCerts
def getNumOfDistinctIssuers(self):
query = """
WITH ci AS (
SELECT min(sub.CERTIFICATE_ID) ID,
min(sub.ISSUER_CA_ID) ISSUER_CA_ID,
array_agg(DISTINCT sub.NAME_VALUE) NAME_VALUES,
x509_commonName(sub.CERTIFICATE) COMMON_NAME,
x509_notBefore(sub.CERTIFICATE) NOT_BEFORE,
x509_notAfter(sub.CERTIFICATE) NOT_AFTER,
encode(x509_serialNumber(sub.CERTIFICATE), 'hex') SERIAL_NUMBER
FROM (SELECT *
FROM certificate_and_identities cai
WHERE plainto_tsquery('certwatch', %s) @@ identities(cai.CERTIFICATE)
AND cai.NAME_VALUE ILIKE ('%%' || %s || '%%')
AND NOT EXISTS (
SELECT 1
FROM certificate c2
WHERE x509_serialNumber(c2.CERTIFICATE) = x509_serialNumber(cai.CERTIFICATE)
AND c2.ISSUER_CA_ID = cai.ISSUER_CA_ID
AND c2.ID < cai.CERTIFICATE_ID
AND x509_tbscert_strip_ct_ext(c2.CERTIFICATE) = x509_tbscert_strip_ct_ext(cai.CERTIFICATE)
LIMIT 1
)
LIMIT 10000
) sub
GROUP BY sub.CERTIFICATE
)
SELECT ci.ISSUER_CA_ID,
ca.NAME ISSUER_NAME,
'' NAME_VALUE,
min(ci.ID) ID,
count(DISTINCT ci.ID) NUM_CERTS
FROM ci
LEFT JOIN LATERAL (
SELECT min(ctle.ENTRY_TIMESTAMP) ENTRY_TIMESTAMP
FROM ct_log_entry ctle
WHERE ctle.CERTIFICATE_ID = ci.ID
) le ON TRUE,
ca
WHERE ci.ISSUER_CA_ID = ca.ID
GROUP BY ci.ISSUER_CA_ID, ISSUER_NAME
ORDER BY NUM_CERTS DESC, NAME_VALUE, ISSUER_NAME;
"""
cur = self.conn.cursor()
cur.execute(query, (self.base, self.base))
rows = cur.fetchall()
self.numOfIssuers = len(rows)
return self.numOfIssuers
def getNumOfSubdomains(self):
self.subdomains = set()
for row in self.baseCertData:
commonName = row[2]
subjectAltName = row[3].split()
self.subdomains.add(commonName)
self.subdomains.update(subjectAltName)
return len(self.subdomains)
def getEarliestCert(self):
notBeforeTime = []
for row in self.baseCertData:
unixTime = row[6].timestamp()
notBeforeTime.append(unixTime)
return(min(notBeforeTime))
if __name__ == "__main__":
ctl = CertificateTransparencyLog("example.com")
print("[!] Gathered basic cert data")
print("[!] Number of leaf certificates: ", ctl.getNumOfLeafCerts())
print("[!] Number of subdomains: ", ctl.getNumOfSubdomains())
print("[!] Number of issuers: ", ctl.getNumOfDistinctIssuers())
print("[!] Earliest Valid-From datetime: ", ctl.getEarliestCert())
ctl.conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment