Created
October 14, 2013 03:07
-
-
Save ThatDevopsGuy/6970081 to your computer and use it in GitHub Desktop.
A (very outdated) replacement for ssh-keyscan
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# ssh-keyscan-aliases (version is in first parser line) | |
# Sebastian Weigand, February 2010 (and later modifications) | |
# Requires Python 2.5 - 2.x | x > 5, and a UNIX-like OS | |
# A smarter `ssh-keyscan` | |
from os import access, R_OK, putenv | |
from socket import getfqdn, gethostbyname, gethostbyname_ex, gaierror, herror | |
from subprocess import Popen, PIPE | |
from sys import stdin, stderr | |
from optparse import OptionParser | |
from re import finditer, escape | |
# Initial variable instantiation: | |
prog = 'ssh-keyscan:' # Because I dislike the './' it may prepend with argv[0] | |
badNamedHosts = [] | |
badScanHosts = [] | |
hostList = [] | |
# Override system PATH for our execution: | |
putenv('PATH', '/usr/bin:/bin:/usr/sbin:/sbin:') | |
# This must be a dictionary, as we need to compare all possible | |
# aliases of a host based on the host itself: | |
fqdns = {} | |
# ------------------------------------------------------------------------ | |
parser = OptionParser(usage="usage: %prog [options] <host1> <host2> ... <hostN>\nWhere <host> may be one of: fully-qualified domain name (FQDN), alias/nickname, or IP address\n\nExample: %prog -f hosts.txt\t\t- read 1 hostname per line from 'hosts.txt'\nExample: %prog -M 1 host1 \t\t- only process hostname and fqdn for host1\nExample: %prog -qC host1 host2 \t\t- ensure host1 and host2 have identical keys", version="%prog 3.4.1") | |
parser.add_option("-f", "--file", dest="file", | |
help = "read hostnames from FILE, use '-' for STDIN") | |
parser.add_option("-T", "--timeout", dest="timeout", default="1", | |
help = "specify keyscan timeout, default is 1 second") | |
parser.add_option("-l", "--long-hostname", dest="long", action="store_true", | |
help = "shorthand for -m2 -M3, impiles non-unique hostnames, but unique domain names") | |
parser.add_option("-M", "--maximum-subdomains", dest="domainEnd", default="2", type="int", | |
help = "specify Maximum number of subdomains to include in alias listing, default=2") | |
parser.add_option("-m", "--minimum-subdomains", dest="domainStart", default="1", type="int", | |
help = "specify minimum number of subdomains to include in alias listing, default=1") | |
parser.add_option("-c", "--check", dest="checkKeys", action="store_true", | |
help = "check every alias' SSH keys to ensure they are identical") | |
parser.add_option("-C", "--check-all", dest="checkAllKeys", action="store_true", | |
help = "check every host's SSH keys to ensure they are identical to every other host, implies -c") | |
parser.add_option("-q", "--quiet", dest="quiet", action="store_true", | |
help = "do not print SSH keys or aliases, useful with '-c' or to see if machines are online") | |
parser.add_option("-t", dest="type", | |
help = "this option is ignored, kept for legacy compatibility") | |
parser.add_option("-u", "--unique", dest="unique", action="store_true", | |
help = "only print results if the host(s) were not found in the master ssh_known_hosts file") | |
parser.add_option("-v", "--verbose", dest="verbose", action="store_true", | |
help = "print out what values are found when they are found") | |
parser.add_option("-d", "--debug", dest="debug", action="store_true", | |
help = "print out script logic decisions, implies -v") | |
(options, args) = parser.parse_args() | |
# ------------------------------------------------------------------------ | |
# We only process the rsa,dsa option: | |
if options.type: | |
print >> stderr, prog, 'warning: the "-t" option is ignored, update your records accordingly' | |
def uniq(seq): | |
# Does not preserve order: | |
return {}.fromkeys(seq).keys() | |
if options.debug and not options.verbose: | |
options.verbose = True | |
if options.checkAllKeys and not options.checkKeys: | |
options.checkKeys = True | |
if options.long: | |
options.domainStart = 2 | |
options.domainEnd = 3 | |
if options.domainStart > options.domainEnd: | |
print >> stderr, prog, 'warning: Your minimum number of subdomains is greater than your number of subdomains, adjusting...' | |
options.domainEnd = options.domainStart | |
# Because our array starts with index 0: | |
options.domainStart -= 1 | |
if options.file: | |
if options.file == '-': | |
hostList += stdin.read().splitlines() | |
elif access(options.file, R_OK): | |
f = open(options.file, 'r') | |
# We don't like empty lines: | |
for line in f.readlines(): | |
if len(line) > 1: | |
hostList.append(line.strip()) | |
f.close() | |
else: | |
parser.error("Could not read from file: " + options.file) | |
elif len(args) == 0: | |
hostList += stdin.read().splitlines() | |
hostList += args | |
hostList = sorted(uniq(hostList)) | |
# Remove any already scanned entries from the hostList if -u is passed: | |
if options.unique: | |
if access('/etc/ssh/ssh_known_hosts', R_OK): | |
file = open('/etc/ssh/ssh_known_hosts', 'r') | |
keys = file.read().splitlines() | |
file.close() | |
uniqueHosts = [] | |
for host in hostList: | |
found = False | |
for entry in keys: | |
if host in entry: | |
if options.debug: | |
print >> stderr, 'DEBUG: Found', host, 'in known_hosts as', entry, '\n' | |
found = True | |
if not found: | |
if options.verbose: | |
print >> stderr, prog, host, 'was not found in ssh_known_hosts, will scan...' | |
uniqueHosts.append(host) | |
elif options.verbose: | |
print >> stderr, prog, host, 'already exists in ssh_known_hosts, skipping...' | |
hostList = sorted(uniqueHosts) | |
if options.debug: | |
print >> stderr, 'DEBUG: hostList is now:', hostList | |
else: | |
parser.error("Could not read from /etc/ssh/ssh_known_hosts") | |
# ------------------------------------------------------------------------ | |
def printArray(array): | |
for item in array: | |
print >> stderr, item | |
print >> stderr, '' | |
def stringBuilder(array): | |
returnString = '' | |
for string in array: | |
if len(string) != 0: | |
returnString += string + ',' | |
return returnString[:-1] # One last evil comma | |
def getDomainAliases(host): | |
if options.debug: | |
print >> stderr, '\n\nDEBUG: Processing', host, '...' | |
print >> stderr, '------------------------------------------------------------' | |
fqdns[host] = [] | |
ex = gethostbyname_ex(host) | |
ips = ex[2] | |
# Process all the DNS resolution per associated IP address(es) of host: | |
if options.debug: | |
print >> stderr, 'DEBUG: Got IP addresses from gethostbyname_ex[2]:', ips | |
for ip in ips: | |
fqdns[host].append(getfqdn(ip).lower()) | |
if options.debug: | |
for fqdn in fqdns[host]: | |
print >> stderr, 'DEBUG: Got fqdn from getfqdn:', fqdn | |
# Adds hostList string, if different than DNS resolution: | |
if len(ex[1]) != 0: | |
for alias in ex[1]: | |
if options.debug: | |
print >> stderr, 'DEBUG: Got fqdn from gethostbyname_ex[1]:', alias | |
fqdns[host].append(alias) | |
# Adds domain aliases (not round-robin) for hostList (if different): | |
if ex[0] not in fqdns and ex[0] not in ips: | |
if options.debug: | |
print >> stderr, 'DEBUG: Got fqdn from gethostbyname_ex[0]:', ex[0] | |
fqdns[host].append(ex[0]) | |
# This new array will contain all the aliases, and versions of the host: | |
names = [] | |
for fqdn in fqdns[host]: | |
# Get the locations of all the '.'s in the fqdn: | |
periods = [match.start() for match in finditer(escape('.'), fqdn)] | |
if options.debug: | |
print >> stderr, 'DEBUG: Periods:',periods | |
def shortener(fqdn): | |
for i in range(options.domainStart, options.domainEnd): | |
if options.debug: | |
print >> stderr, 'DEBUG: Shortening', fqdn, 'to', fqdn[:periods[i]] | |
names.append(fqdn[:periods[i]]) | |
# We can't return more subdomains than we have in the fqdn: | |
if len(periods) == 0: | |
''' | |
We will (in general) only process a zero-length 'fqdn' if the fqdn was resolved | |
outside of DNS (like in /etc/hosts), where the "fully" qualified name does not | |
contain any periods. | |
''' | |
print >> stderr, prog, 'warning: No subdomains found for', fqdn, '(nonunique hostname?)' | |
if options.verbose: | |
print >> stderr, '\tThis probably occurs because you have hosts defined outside of DNS, like in /etc/hosts' | |
names.append(fqdn) | |
elif len(periods) + 1 <= options.domainEnd: | |
print >> stderr, prog, 'warning: There are not enough subdomains for your requested level of subdomains, using maximum value...' | |
if options.verbose: | |
print >> stderr, '\tYou requested', options.domainEnd, 'subdomains (up to, but not including the FQDN), yet', fqdn, 'only contains', (len(periods) +1), 'subdomains, or is equivalent to the FQDN.' | |
options.domainEnd= len(periods) | |
shortener(fqdn) | |
else: | |
shortener(fqdn) | |
finalArray = fqdns[host] + names + ips | |
if options.verbose: | |
print >> stderr, prog, 'Minimum subdomains:', options.domainStart + 1, 'Maximum subdomains:', options.domainEnd | |
print >> stderr, prog, 'Domain alias information for', host, 'is:', finalArray | |
# It's cheaper to uniq it once, rather than per each addition: | |
return stringBuilder(sorted(uniq(finalArray))) | |
# `ssh-keyscan` does not return useful output via STDERR: | |
def getRSAKey(host): | |
p = Popen("ssh-keyscan -T " + options.timeout + " -t rsa " + host + " 2> /dev/null", shell=True, stdout=PIPE) | |
key = p.communicate()[0] | |
return key[key.index('ssh-'):] | |
def getDSAKey(host): | |
p = Popen("ssh-keyscan -T " + options.timeout + " -t dsa " + host + " 2> /dev/null", shell=True, stdout=PIPE) | |
key = p.communicate()[0] | |
return key[key.index('ssh-'):] | |
# Used if -C (checkAllKeys) is True | |
masterKeysSet = False | |
masterRSAKey = None | |
masterDSAKey = None | |
masterHost = None | |
for host in hostList: | |
try: | |
aliases = getDomainAliases(host) | |
# If the host is invalid (<garbage>.com): | |
except gaierror: | |
badNamedHosts.append(host) | |
except KeyboardInterrupt: | |
print >> stderr, '\nAborted.' | |
exit(1) | |
# If the host is not processable by 'gethostbyname_ex': | |
# esp. if it is a null string: | |
except herror: | |
if options.verbose: | |
print >> stderr, prog, 'warning: Attempting to process something other than a host, skipping.' | |
else: | |
try: | |
rsaKey = getRSAKey(host) | |
dsaKey = getDSAKey(host) | |
# This will only compare the first host against the rest: | |
if options.checkAllKeys: | |
if not masterKeysSet: | |
masterRSAKey = rsaKey | |
masterDSAKey = dsaKey | |
masterHost = host | |
masterKeysSet = True | |
if options.verbose: | |
print >> stderr, 'Set master key to', host | |
elif masterRSAKey != rsaKey or masterDSAKey != dsaKey: | |
print >> stderr, prog, '[', host, ',', gethostbyname(host), '] does not match keys with [', masterHost, ',', gethostbyname(masterHost), ']' | |
# This will only compare the first alias against the rest: | |
if options.checkKeys: | |
for fqdn in sorted(uniq(fqdns[host])): | |
if options.verbose: | |
print >> stderr, prog, 'Testing keys for', fqdn, '...', | |
if getRSAKey(fqdn) != rsaKey or getDSAKey(fqdn) != dsaKey: | |
print >> stderr, '\t\t[FAIL]' | |
print >> stderr, '\n', prog, ': [', host, ',', gethostbyname(host), '] did not match keys with [', fqdn, ',', gethostbyname(fqdn), ']' | |
print >> stderr, 'Keys for [', host, ',', gethostbyname(host), '] :' | |
print >> stderr, rsaKey, dsaKey, '\n' | |
print >> stderr, 'Keys for [', fqdn, ',', gethostbyname(fqdn), '] :' | |
print >> stderr, getRSAKey(fqdn), getDSAKey(fqdn) | |
else: | |
print >> stderr, '\t\t[ OK ]' | |
else: | |
if getRSAKey(fqdn) != rsaKey or getDSAKey(fqdn) != dsaKey: | |
print >> stderr, '\n', prog, 'Testing FAILED, as [', host, ',', gethostbyname(host), '] did not match keys with [', fqdn, ',', gethostbyname(fqdn), ']\n' | |
except ValueError: | |
badScanHosts.append(host) | |
except KeyboardInterrupt: | |
print >> stderr, '\nAborted.' | |
exit(1) | |
else: | |
if not options.quiet: | |
print aliases, rsaKey, | |
print aliases, dsaKey | |
if len(badNamedHosts) != 0: | |
print >> stderr, prog, 'error: We could not find these hosts in DNS:' | |
printArray(sorted(badNamedHosts)) | |
if len(badScanHosts) != 0: | |
print >> stderr, prog, 'error: We found these hosts in DNS, but could not scan them:' | |
printArray(sorted(badScanHosts)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment