Last active
August 29, 2015 13:58
-
-
Save victortrac/10430581 to your computer and use it in GitHub Desktop.
Scans a host, route53 zones, and/or bind zone files for A and CNAME records and inventories SSL certificates
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import base64 | |
import datetime | |
import hashlib | |
import json | |
import socket | |
import ssl | |
import sys | |
import textwrap | |
import OpenSSL | |
from contextlib import closing | |
import boto | |
class SetEncoder(json.JSONEncoder): | |
def default(self, obj): | |
if isinstance(obj, set): | |
return sorted(list(obj)) | |
return json.JSONEncoder.default(self, obj) | |
def DER_cert_to_PEM_cert(der_cert_bytes): | |
"""Takes a certificate in binary DER format and returns the | |
PEM version of it as a string.""" | |
PEM_HEADER = "-----BEGIN CERTIFICATE-----" | |
PEM_FOOTER = "-----END CERTIFICATE-----" | |
f = str.encode(base64.standard_b64encode(der_cert_bytes), 'ASCII', 'strict') | |
return (PEM_HEADER + '\n' + | |
textwrap.fill(f, 64) + '\n' + | |
PEM_FOOTER + '\n') | |
def get_r53_records(): | |
_records = set() | |
with closing(boto.connect_route53()) as conn: | |
zones = conn.get_zones() | |
for z in zones: | |
for r in z.get_records(): | |
if r.type in ['A', 'CNAME']: | |
_records.add(r.name) | |
for r in _records: | |
yield r | |
def get_bindfile_records(path): | |
_records = set() | |
with open(path) as f: | |
for line in f.readlines(): | |
# naive bind zone parser | |
record = line.split('\t') | |
if len(record) == 5 and record[3] in ['A', 'CNAME']: | |
_records.add(record[4].strip()) | |
for r in _records: | |
yield r | |
def get_socket(timeout=2): | |
# we do this to set a timeout on the socket | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.settimeout(timeout) | |
ssl_sock = ssl.wrap_socket(s, | |
ssl_version=ssl.PROTOCOL_SSLv3, | |
cert_reqs=ssl.CERT_NONE, | |
ca_certs=None) | |
return ssl_sock | |
def check_ssl_cert(host): | |
def _get_cn(components): | |
cn = [] | |
for i in components: | |
if i[0] == 'CN': | |
cn.append(i[1]) | |
return cn | |
def _parse_datetime(datetime_string): | |
return datetime.datetime.strptime(datetime_string, "%Y%m%d%H%M%SZ") | |
s = get_socket() | |
try: | |
s.connect((host, 443)) | |
except socket.timeout: | |
print 'timeout.' | |
return None | |
except socket.gaierror: | |
print 'unresolvable.' | |
return None | |
except socket.error: | |
print 'connection refused' | |
return None | |
dercert = s.getpeercert(True) | |
s.close() | |
cert = DER_cert_to_PEM_cert(dercert) | |
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert) | |
cert_info = { | |
'cn': ", ".join(_get_cn(x509.get_subject().get_components())), | |
'serial_number': x509.get_serial_number(), | |
'issuer': ", ".join(_get_cn(x509.get_issuer().get_components())), | |
'sha1': x509.digest('sha1'), | |
'md5': x509.digest('md5'), | |
'expiration': _parse_datetime(x509.get_notAfter()).isoformat() } | |
return cert_info | |
def check_host(host): | |
print '{}:'.format(host), | |
cert_info = check_ssl_cert(host) | |
if cert_info: | |
print '{}'.format(cert_info) | |
if cert_info['sha1'] in certs_in_use: | |
certs_in_use[cert_info['sha1']]['hosts'].add(host) | |
else: | |
certs_in_use[cert_info['sha1']] = cert_info | |
certs_in_use[cert_info['sha1']]['hosts'] = set([host]) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser('dns ssl scanner') | |
parser.add_argument('--host', default=None, help="single host to check cert") | |
parser.add_argument('--r53', default=False, help="Query R53 for records", action='store_true') | |
parser.add_argument('--bindfile', default=None, help="path to bind file to parse") | |
args = parser.parse_args() | |
if not (args.host or args.r53 or args.bindfile): | |
parser.print_help() | |
sys.exit(1) | |
certs_in_use = {} | |
if args.host: | |
check_host(args.host) | |
if args.r53: | |
for host in get_r53_records(): | |
check_host(host) | |
if args.bindfile: | |
for host in get_bindfile_records(args.bindfile): | |
check_host(host) | |
print json.dumps(certs_in_use, cls=SetEncoder, indent=4, sort_keys=True) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment