Skip to content

Instantly share code, notes, and snippets.

@teeberg
Last active January 9, 2018 06:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save teeberg/5dc13b1f0b229d8ba9e8e5ba98c3214c to your computer and use it in GitHub Desktop.
Save teeberg/5dc13b1f0b229d8ba9e8e5ba98c3214c to your computer and use it in GitHub Desktop.
`eb ssh` wrapper to connect to Elastic Beanstalk while verifying SSH host keys using the instance's console output
function eb() {
if [[ $# > 0 && $1 == "ssh" ]]; then
"$(which eb)" ssh -e "$(which ebssh.py)" "${@:2}"
else
"$(which eb)" "$@"
fi
}
#!/Users/jonas/.venvs/~bin/bin/python3
# coding=utf-8
import errno
import json
import re
import subprocess
import sys
from functools import wraps
from os import makedirs as os_makedirs
from os.path import dirname, exists, isdir
from pprint import pformat
import boto3
import yaml
from ebcli.operations.sshops import _get_ssh_file
with open('.elasticbeanstalk/config.yml') as fd:
eb_data = yaml.load(fd)
session = boto3.Session(profile_name=eb_data['global']['profile'],
region_name=eb_data['global']['default_region'])
ec2 = session.client('ec2')
def main(args):
ip_address = args[-1].split('@')[1]
instance_data = json.loads(get_instance_data(ip_address))
instance_id, keypair_name = instance_data
expected_fingerprints = aws_get_host_key_fingerprints(ip_address, instance_id)
hash_algorithm = next(iter(expected_fingerprints.values())).split(':')[0].lower()
actual_fingerprints = ssh_get_host_key_fingerprints(ip_address, hash_algorithm)
matching_fingerprints = dict(
(key, value)
for key, value
in expected_fingerprints.items()
if value in actual_fingerprints
)
if not matching_fingerprints:
raise RuntimeError(
'Did not find any matching fingerprints!\n'
'Expected: {}\n'.format(pformat(sorted(expected_fingerprints.values()))) +
'Actual: {}'.format(pformat(sorted(actual_fingerprints))))
print('{} matching fingerprints'.format(len(matching_fingerprints)))
cmd = [
'ssh',
'-i',
_get_ssh_file(keypair_name),
'-o',
'UserKnownHostsFile={}'.format(ssh_keyscan.cached_file_path_template.format(ip_address))
] + args
print(cmd)
p = subprocess.Popen(cmd, stdout=sys.stdout, stderr=sys.stderr, stdin=sys.stdin)
p.communicate()
return p.returncode
def local(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE):
print(cmd)
p = subprocess.Popen(cmd, stdout=stdout, stderr=stderr)
stdout, stderr = p.communicate()
if stdout is not None:
stdout = stdout.decode('utf-8')
if stderr is not None:
stderr = stderr.decode('utf-8')
if p.returncode != 0:
raise RuntimeError('Command failed: {}\n'.format(cmd) +
'Stdout: {}\n'.format(stdout) +
'Stderr: {}\n'.format(stderr))
return stdout, stderr
def cached_host_file(filename):
def wrapper(f):
@wraps(f)
def inner(ip_address, *args, **kwargs):
filepath = inner.cached_file_path_template.format(ip_address)
makedirs(dirname(filepath))
if exists(filepath):
with open(filepath, 'r') as fd:
contents = fd.read()
return contents
result = f(ip_address, *args, **kwargs)
with open(filepath, 'w') as fd:
fd.write(result)
return result
inner.cached_file_path_template = '/tmp/.ebssh/{{}}/{}'.format(filename)
return inner
return wrapper
def makedirs(name, mode=None):
try:
if mode is None:
os_makedirs(name)
else:
os_makedirs(name, mode)
except OSError as e:
# be happy if someone already created the path
if e.errno != errno.EEXIST:
raise
# but only if it's actually a directory
if not isdir(name):
raise
@cached_host_file('ssh-keyscan.txt')
def ssh_keyscan(ip_address):
print('Looking up SSH keys for host {}'.format(ip_address))
return local(['ssh-keyscan', '-H', ip_address])[0]
@cached_host_file('instance-data.txt')
def get_instance_data(ip_address):
print('Looking up instance ID for IP address {}...'.format(ip_address))
response = ec2.describe_instances(
Filters=[
{
'Name': 'ip-address',
'Values': [ip_address],
}
]
)
reservations = response['Reservations']
assert len(reservations) == 1, reservations
instances = reservations[0]['Instances']
assert len(instances) == 1, instances
instance = instances[0]
return json.dumps([
instance['InstanceId'],
instance['KeyName']
])
@cached_host_file('console-output.txt')
def get_console_output(ip_address, instance_id):
return ec2.get_console_output(InstanceId=instance_id)['Output']
def aws_get_host_key_fingerprints(ip_address, instance_id):
console_output = get_console_output(ip_address, instance_id)
pattern = (
r"Generating public/private ([^\s]+) key pair.[\r\n]+"
r"Your identification has been saved in /etc/ssh/[^\s]+\.[\r\n]+"
r"Your public key has been saved in /etc/ssh/[^\s]+\.pub\.\r?[\r\n]+"
r"The key fingerprint is:[\r\n]+"
r"([^\s]+) [^\s]+"
)
fingerprints = re.findall(pattern, console_output)
assert fingerprints, ('Could not find fingerprints, check {}'
.format(get_console_output.cached_file_path_template.format(ip_address)))
print('... {} fingerprints'.format(len(fingerprints)))
return dict(
(algo, 'MD5:{}'.format(fp) if not fp.startswith('SHA256') else fp)
for algo, fp in fingerprints
)
@cached_host_file('ssh-keygen.txt')
def ssh_keygen(ip_address, hash_algorithm):
host_keys_file = ssh_keyscan.cached_file_path_template.format(ip_address)
return local(['ssh-keygen', '-E', hash_algorithm, '-lf', host_keys_file])[0]
def ssh_get_host_key_fingerprints(ip_address, hash_algorithm='md5'):
ssh_keyscan(ip_address)
fingerprint_lines = ssh_keygen(ip_address, hash_algorithm).splitlines()
actual_fingerprints = [
line.split(' ')[1] for line in
fingerprint_lines
]
return actual_fingerprints
if __name__ == '__main__':
exit(main(sys.argv[1:]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment