Created
November 17, 2022 09:30
-
-
Save alt-glitch/485c93a1648d461d38e49efe80d6e886 to your computer and use it in GitHub Desktop.
Sample script to query crt.sh for certificate transparency log data.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import psycopg2 | |
class CertificateTransparencyLog: | |
def __init__(self, base) -> None: | |
self.conn = psycopg2.connect( | |
host="crt.sh", | |
database="certwatch", | |
user="guest", | |
port="5432" | |
) | |
self.conn.autocommit = True | |
self.base = base | |
self.CAIDs = [] | |
query = """ | |
WITH ci AS ( | |
SELECT min(sub.CERTIFICATE_ID) ID, | |
min(sub.ISSUER_CA_ID) ISSUER_CA_ID, | |
array_agg(DISTINCT sub.NAME_VALUE) NAME_VALUES, | |
x509_commonName(sub.CERTIFICATE) COMMON_NAME, | |
x509_notBefore(sub.CERTIFICATE) NOT_BEFORE, | |
x509_notAfter(sub.CERTIFICATE) NOT_AFTER, | |
encode(x509_serialNumber(sub.CERTIFICATE), 'hex') SERIAL_NUMBER | |
FROM (SELECT * | |
FROM certificate_and_identities cai | |
WHERE plainto_tsquery('certwatch', %s) @@ identities(cai.CERTIFICATE) | |
AND cai.NAME_VALUE ILIKE ('%%' || %s || '%%') | |
AND NOT EXISTS ( | |
SELECT 1 | |
FROM certificate c2 | |
WHERE x509_serialNumber(c2.CERTIFICATE) = x509_serialNumber(cai.CERTIFICATE) | |
AND c2.ISSUER_CA_ID = cai.ISSUER_CA_ID | |
AND c2.ID < cai.CERTIFICATE_ID | |
AND x509_tbscert_strip_ct_ext(c2.CERTIFICATE) = x509_tbscert_strip_ct_ext(cai.CERTIFICATE) | |
LIMIT 1 | |
) | |
LIMIT 10000 | |
) sub | |
GROUP BY sub.CERTIFICATE | |
) | |
SELECT ci.ISSUER_CA_ID, | |
ca.NAME ISSUER_NAME, | |
ci.COMMON_NAME, | |
array_to_string(ci.NAME_VALUES, chr(10)) NAME_VALUE, | |
ci.ID ID, | |
le.ENTRY_TIMESTAMP, | |
ci.NOT_BEFORE, | |
ci.NOT_AFTER, | |
ci.SERIAL_NUMBER | |
FROM ci | |
LEFT JOIN LATERAL ( | |
SELECT min(ctle.ENTRY_TIMESTAMP) ENTRY_TIMESTAMP | |
FROM ct_log_entry ctle | |
WHERE ctle.CERTIFICATE_ID = ci.ID | |
) le ON TRUE, | |
ca | |
WHERE ci.ISSUER_CA_ID = ca.ID | |
ORDER BY le.ENTRY_TIMESTAMP DESC NULLS LAST; | |
""" | |
cur = self.conn.cursor() | |
cur.execute(query, (self.base, self.base)) | |
rows = cur.fetchall() | |
self.baseCertData = rows | |
self.numOfLeafCerts = len(rows) | |
cur.close() | |
def getNumOfLeafCerts(self): | |
return self.numOfLeafCerts | |
def getNumOfDistinctIssuers(self): | |
query = """ | |
WITH ci AS ( | |
SELECT min(sub.CERTIFICATE_ID) ID, | |
min(sub.ISSUER_CA_ID) ISSUER_CA_ID, | |
array_agg(DISTINCT sub.NAME_VALUE) NAME_VALUES, | |
x509_commonName(sub.CERTIFICATE) COMMON_NAME, | |
x509_notBefore(sub.CERTIFICATE) NOT_BEFORE, | |
x509_notAfter(sub.CERTIFICATE) NOT_AFTER, | |
encode(x509_serialNumber(sub.CERTIFICATE), 'hex') SERIAL_NUMBER | |
FROM (SELECT * | |
FROM certificate_and_identities cai | |
WHERE plainto_tsquery('certwatch', %s) @@ identities(cai.CERTIFICATE) | |
AND cai.NAME_VALUE ILIKE ('%%' || %s || '%%') | |
AND NOT EXISTS ( | |
SELECT 1 | |
FROM certificate c2 | |
WHERE x509_serialNumber(c2.CERTIFICATE) = x509_serialNumber(cai.CERTIFICATE) | |
AND c2.ISSUER_CA_ID = cai.ISSUER_CA_ID | |
AND c2.ID < cai.CERTIFICATE_ID | |
AND x509_tbscert_strip_ct_ext(c2.CERTIFICATE) = x509_tbscert_strip_ct_ext(cai.CERTIFICATE) | |
LIMIT 1 | |
) | |
LIMIT 10000 | |
) sub | |
GROUP BY sub.CERTIFICATE | |
) | |
SELECT ci.ISSUER_CA_ID, | |
ca.NAME ISSUER_NAME, | |
'' NAME_VALUE, | |
min(ci.ID) ID, | |
count(DISTINCT ci.ID) NUM_CERTS | |
FROM ci | |
LEFT JOIN LATERAL ( | |
SELECT min(ctle.ENTRY_TIMESTAMP) ENTRY_TIMESTAMP | |
FROM ct_log_entry ctle | |
WHERE ctle.CERTIFICATE_ID = ci.ID | |
) le ON TRUE, | |
ca | |
WHERE ci.ISSUER_CA_ID = ca.ID | |
GROUP BY ci.ISSUER_CA_ID, ISSUER_NAME | |
ORDER BY NUM_CERTS DESC, NAME_VALUE, ISSUER_NAME; | |
""" | |
cur = self.conn.cursor() | |
cur.execute(query, (self.base, self.base)) | |
rows = cur.fetchall() | |
self.numOfIssuers = len(rows) | |
return self.numOfIssuers | |
def getNumOfSubdomains(self): | |
self.subdomains = set() | |
for row in self.baseCertData: | |
commonName = row[2] | |
subjectAltName = row[3].split() | |
self.subdomains.add(commonName) | |
self.subdomains.update(subjectAltName) | |
return len(self.subdomains) | |
def getEarliestCert(self): | |
notBeforeTime = [] | |
for row in self.baseCertData: | |
unixTime = row[6].timestamp() | |
notBeforeTime.append(unixTime) | |
return(min(notBeforeTime)) | |
if __name__ == "__main__": | |
ctl = CertificateTransparencyLog("example.com") | |
print("[!] Gathered basic cert data") | |
print("[!] Number of leaf certificates: ", ctl.getNumOfLeafCerts()) | |
print("[!] Number of subdomains: ", ctl.getNumOfSubdomains()) | |
print("[!] Number of issuers: ", ctl.getNumOfDistinctIssuers()) | |
print("[!] Earliest Valid-From datetime: ", ctl.getEarliestCert()) | |
ctl.conn.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment