Skip to content

Instantly share code, notes, and snippets.

@chronos-tachyon
Created March 23, 2022 00:49
Show Gist options
  • Save chronos-tachyon/4ac048b55d98a72a2a04c2bb9410ccd7 to your computer and use it in GitHub Desktop.
Save chronos-tachyon/4ac048b55d98a72a2a04c2bb9410ccd7 to your computer and use it in GitHub Desktop.
Script to synchronize SSH keys across a fleet from a central command-and-control host; meant for use with a Wireguard VPN but others may find it useful to crib from.
#!/usr/bin/env python3
#
# Written by Donald King <chronos@chronos-tachyon.net>
# Public Domain.
#
# ==== CC0 https://creativecommons.org/publicdomain/zero/1.0/ ====
# [To the extent possible under law, I have waived all copyright ]
# [and related or neighboring rights to this script. This work is]
# [published from the United States of America. ]
import argparse
import collections
import json
import os
import pathlib
import pwd
import subprocess
import sys
import tempfile
DATA_DIR = pathlib.Path('/root/.ssh')
ID_DIR = DATA_DIR / 'id'
HOSTS_FILE = DATA_DIR / 'hosts.json'
RULES_FILE = DATA_DIR / 'rules.json'
GETPWNAM_CACHE = {}
class Rule(collections.namedtuple('Rule', ['sources', 'targets'])):
pass
class UserHost(collections.namedtuple('UserHost', ['user', 'host'])):
@property
def pwd(self):
return getpwnam(self.user)
def run(*argv):
argv = list(map(str, argv))
print('+ {!r}'.format(argv), file=sys.stderr)
if not args.dry_run:
p = subprocess.run(argv)
if p.returncode != 0:
sys.exit(1)
def getpwnam(username):
pw = GETPWNAM_CACHE.get(username, None)
if not pw:
print('+ getpwnam {!r}'.format(username), file=sys.stderr)
pw = pwd.getpwnam(username)
GETPWNAM_CACHE[username] = pw
return pw
def installer_local(target, tmpfile):
homedir = pathlib.Path(target.pwd.pw_dir)
sshdir = homedir / '.ssh'
dstfile = sshdir / 'authorized_keys'
print('Installing: {}@{}'.format(target.user, target.host))
run('cp', '-a', tmpfile, dstfile)
print()
def installer_remote(target, tmpfile):
homedir = pathlib.Path(target.pwd.pw_dir)
sshdir = homedir / '.ssh'
dstfile = sshdir / 'authorized_keys'
dsthost = '{}.vpn'.format(target.host)
dstremote = '{}:{}'.format(dsthost, dstfile)
print('Installing: {}@{}'.format(target.user, target.host))
run('rsync', '-a', tmpfile, dstremote)
print()
def installer_offline(target, tmpfile):
print('Skipping {}@{}...'.format(target.user, target.host))
print()
INSTALLERS = {
'local': installer_local,
'remote': installer_remote,
'offline': installer_offline,
}
parser = argparse.ArgumentParser()
parser.add_argument(
'-n',
'--dry-run',
action='store_true',
help='do not act',
)
parser.add_argument(
'--only',
metavar='HOST',
default=[],
action='append',
help='list of hosts to sync; default is all hosts',
)
args = parser.parse_args()
with open(HOSTS_FILE, 'rt') as fp:
installers_by_host = json.load(fp)
rules = []
with open(RULES_FILE, 'rt') as fp:
for row in json.load(fp):
rule_sources = []
for srow in row['sources']:
source_user = srow['user']
source_host = srow['host']
source = UserHost(source_user, source_host)
rule_sources.append(source)
rule_targets = []
for trow in row['targets']:
target_user = trow['user']
target_host = trow['host']
target = UserHost(target_user, target_host)
rule_targets.append(target)
rule = Rule(rule_sources, rule_targets)
rules.append(rule)
lines_by_source = {}
content_by_target = {}
for rule in rules:
partial = ''
for source in rule.sources:
line = lines_by_source.get(source, None)
if line is None:
keyfile0 = ID_DIR / '{}-at-{}.ed25519.pub'.format(source.user, source.host)
keyfile1 = ID_DIR / '{}-at-{}.rsa.pub'.format(source.user, source.host)
if keyfile0.exists():
keyfile = keyfile0
elif keyfile1.exists():
keyfile = keyfile1
else:
raise ValueError('no key for {}@{}'.format(source.user, source.host))
with open(keyfile, 'rt') as fp:
line = fp.readline().rstrip('\r\n') + '\n'
lines_by_source[source] = line
partial += line
for target in rule.targets:
content_by_target.setdefault(target, '')
content_by_target[target] += partial
for target, content in content_by_target.items():
if args.only and (target.host not in args.only):
continue
installer_name = installers_by_host.get(target.host, None)
installer_name = installer_name or 'offline'
installer = INSTALLERS[installer_name]
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = pathlib.Path(tmpdir)
tmpfile = tmpdir / 'authorized_keys'
with open(tmpfile, 'wt') as fp:
os.fchown(fp.fileno(), target.pwd.pw_uid, target.pwd.pw_gid)
os.fchmod(fp.fileno(), 0o644)
fp.write(content)
installer(target, tmpfile)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment