Skip to content

Instantly share code, notes, and snippets.

@temoto
Last active March 30, 2016 21:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save temoto/6df2475825417a00851e to your computer and use it in GitHub Desktop.
Save temoto/6df2475825417a00851e to your computer and use it in GitHub Desktop.
Sync (overwrite) IAM public SSH keys to my servers at regular intervals. Update keys immediately on change is possible but make it yourself. CloudTrail-CloudWatch-{write this poll service}.
#!/usr/bin/env python3
import botocore.session
import distutils.spawn
import codecs
import grp, pwd
import os
import shutil
import subprocess
import sys
from datetime import datetime
def log(fmt, *args, **kwargs):
fmt = datetime.utcnow().replace(microsecond=0).isoformat() + '\t' + fmt + '\n'
sys.stderr.write(fmt.format(*args, **kwargs))
def get_ssh_keys(iam, username):
response = iam.list_ssh_public_keys(UserName=username)
assert response['IsTruncated'] == False
ids = [d['SSHPublicKeyId'] for d in response['SSHPublicKeys'] if d['Status'] == 'Active']
log('done list keys UserName={0} active ids={1}', username, ids)
lines = []
for kid in ids:
response = iam.get_ssh_public_key(UserName=username, SSHPublicKeyId=kid, Encoding='SSH')
body = response['SSHPublicKey']['SSHPublicKeyBody']
log('done get key UserName={0} KeyId={1} body=... {2}', username, kid, body[-50:])
if len(body.split()) < 3:
# add missing comment
body = '{0} AUTO-IAM-User={1}-KeyID={2}'.format(body, username, kid)
lines.append(body)
return '\n'.join(lines).encode()
def random_base64(n):
with open('/dev/urandom', 'rb') as f:
rnd = f.read(n)
return codecs.encode(rnd, 'base64').rstrip().decode()
def which(name, _cache={}):
if name in _cache:
return _cache[name]
path = '/usr/sbin:/usr/bin:/bin:' + os.environ.get('PATH', '')
found = distutils.spawn.find_executable(name, path)
if found is None:
log('error no executable for "{0}"', name)
raise KeyError
_cache[name] = found
return found
def groupadd(name):
try:
return grp.getgrnam(name)
except KeyError:
pass
args = [which('groupadd'), name]
log('exec {0}', args)
subprocess.call(args)
return grp.getgrnam(name)
def useradd(username):
try:
return pwd.getpwnam(username)
except KeyError:
pass
args = [which('useradd'), '--create-home', '--user-group', username]
log('exec {0}', args)
subprocess.call(args)
return pwd.getpwnam(username)
def usermod(username, unix_groups):
args = [
which('usermod'),
'--groups', ','.join(unix_groups),
'--shell', shell_path(),
username,
]
log('exec {0}', args)
subprocess.check_call(args)
def userdel(username):
args = [which('userdel'), username]
log('exec {0}', args)
subprocess.check_call(args)
def shell_path():
options = ('zsh', 'bash', 'sh')
for s in options:
try:
return which(s)
except KeyError:
pass
log('IMPORTANT no valid shell executable found')
raise KeyError
def user_add_mod(username, groups):
useradd(username)
unix_groups = ['iam']
if 'Full' in groups:
unix_groups.append('sudo')
usermod(username, unix_groups)
def user_set_keys(username, keys_content):
home_ssh_path = os.path.expanduser('~{0}/.ssh'.format(username))
log('.ssh path {0}', home_ssh_path)
os.makedirs(home_ssh_path, exist_ok=True)
shutil.chown(home_ssh_path, user=username, group=username)
os.chmod(home_ssh_path, 0o700)
home_keys_path = home_ssh_path + '/authorized_keys'
with open(home_keys_path, 'wb') as f:
f.write(keys_content)
shutil.chown(home_keys_path, user=username, group=username)
os.chmod(home_keys_path, 0o600)
log('done writing keys user={0}', username)
def reset_passwords(user_names):
if not user_names:
return
# set random passwords to skip PAM in sshd
args = [which('chpasswd')]
stdin = ''.join('{0}:{1}\n'.format(u, random_base64(20)) for u in user_names).encode()
log('exec {0}', args)
p = subprocess.Popen(args, stdin=subprocess.PIPE)
p.communicate(input=stdin, timeout=30)
log('done reset passwords')
def main():
log('starting')
session = botocore.session.get_session()
iam = session.create_client('iam')
response = iam.list_users()
assert response['IsTruncated'] == False
all_user_names = [d['UserName'] for d in response['Users']]
log('done list users, count={0}', len(all_user_names))
user_groups = {}
group_users = {}
for u in all_user_names:
response = iam.list_groups_for_user(UserName=u)
assert response['IsTruncated'] == False
groups = [d['GroupName'] for d in response['Groups']]
user_groups.setdefault(u, set()).update(frozenset(groups))
for g in groups:
group_users.setdefault(g, set()).add(u)
users_full = group_users.get('Full', set())
users_developer = group_users.get('Developer', set())
log('done group map, Full={0} Developer={1}', tuple(users_full), tuple(users_developer))
old_iam_users = groupadd('iam').gr_mem
users_valid = users_full | users_developer
all_full_keys = []
for u in users_valid:
keys_content = get_ssh_keys(iam, u)
if u in users_full and keys_content.strip():
all_full_keys.append(keys_content)
user_add_mod(u, user_groups.get(u, ()))
user_set_keys(u, keys_content)
user_set_keys('root', b'\n'.join(all_full_keys))
reset_passwords(users_valid | {'root'})
users_invalid = set(old_iam_users) - users_valid
for u in users_invalid:
log('IMPORTANT user "{0}" exist but not listed in IAM. Resolve manually.')
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
sys.exit(1)
[Unit]
Description=Sync IAM auth to server
After=network.target
[Service]
Type=simple
ExecStart=/usr/local/bin/aws-iam-ssh-keys.py
KillMode=process
Restart=on-failure
RestartSec=3m
[Unit]
Description=Sync IAM auth to server timer
After=network.target
[Timer]
OnBootSec=1min
OnUnitInactiveSec=37min
RandomizedDelaySec=2min
[Install]
WantedBy=timers.target
systemctl enable aws-iam-ssh-keys.timer
systemctl start aws-iam-ssh-keys.timer
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment