Created
March 23, 2022 00:49
-
-
Save chronos-tachyon/4ac048b55d98a72a2a04c2bb9410ccd7 to your computer and use it in GitHub Desktop.
Script to synchronize SSH keys across a fleet from a central command-and-control host; meant for use with a Wireguard VPN but others may find it useful to crib from.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# Written by Donald King <chronos@chronos-tachyon.net> | |
# Public Domain. | |
# | |
# ==== CC0 https://creativecommons.org/publicdomain/zero/1.0/ ==== | |
# [To the extent possible under law, I have waived all copyright ] | |
# [and related or neighboring rights to this script. This work is] | |
# [published from the United States of America. ] | |
import argparse | |
import collections | |
import json | |
import os | |
import pathlib | |
import pwd | |
import subprocess | |
import sys | |
import tempfile | |
DATA_DIR = pathlib.Path('/root/.ssh') | |
ID_DIR = DATA_DIR / 'id' | |
HOSTS_FILE = DATA_DIR / 'hosts.json' | |
RULES_FILE = DATA_DIR / 'rules.json' | |
GETPWNAM_CACHE = {} | |
class Rule(collections.namedtuple('Rule', ['sources', 'targets'])): | |
pass | |
class UserHost(collections.namedtuple('UserHost', ['user', 'host'])): | |
@property | |
def pwd(self): | |
return getpwnam(self.user) | |
def run(*argv): | |
argv = list(map(str, argv)) | |
print('+ {!r}'.format(argv), file=sys.stderr) | |
if not args.dry_run: | |
p = subprocess.run(argv) | |
if p.returncode != 0: | |
sys.exit(1) | |
def getpwnam(username): | |
pw = GETPWNAM_CACHE.get(username, None) | |
if not pw: | |
print('+ getpwnam {!r}'.format(username), file=sys.stderr) | |
pw = pwd.getpwnam(username) | |
GETPWNAM_CACHE[username] = pw | |
return pw | |
def installer_local(target, tmpfile): | |
homedir = pathlib.Path(target.pwd.pw_dir) | |
sshdir = homedir / '.ssh' | |
dstfile = sshdir / 'authorized_keys' | |
print('Installing: {}@{}'.format(target.user, target.host)) | |
run('cp', '-a', tmpfile, dstfile) | |
print() | |
def installer_remote(target, tmpfile): | |
homedir = pathlib.Path(target.pwd.pw_dir) | |
sshdir = homedir / '.ssh' | |
dstfile = sshdir / 'authorized_keys' | |
dsthost = '{}.vpn'.format(target.host) | |
dstremote = '{}:{}'.format(dsthost, dstfile) | |
print('Installing: {}@{}'.format(target.user, target.host)) | |
run('rsync', '-a', tmpfile, dstremote) | |
print() | |
def installer_offline(target, tmpfile): | |
print('Skipping {}@{}...'.format(target.user, target.host)) | |
print() | |
INSTALLERS = { | |
'local': installer_local, | |
'remote': installer_remote, | |
'offline': installer_offline, | |
} | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
'-n', | |
'--dry-run', | |
action='store_true', | |
help='do not act', | |
) | |
parser.add_argument( | |
'--only', | |
metavar='HOST', | |
default=[], | |
action='append', | |
help='list of hosts to sync; default is all hosts', | |
) | |
args = parser.parse_args() | |
with open(HOSTS_FILE, 'rt') as fp: | |
installers_by_host = json.load(fp) | |
rules = [] | |
with open(RULES_FILE, 'rt') as fp: | |
for row in json.load(fp): | |
rule_sources = [] | |
for srow in row['sources']: | |
source_user = srow['user'] | |
source_host = srow['host'] | |
source = UserHost(source_user, source_host) | |
rule_sources.append(source) | |
rule_targets = [] | |
for trow in row['targets']: | |
target_user = trow['user'] | |
target_host = trow['host'] | |
target = UserHost(target_user, target_host) | |
rule_targets.append(target) | |
rule = Rule(rule_sources, rule_targets) | |
rules.append(rule) | |
lines_by_source = {} | |
content_by_target = {} | |
for rule in rules: | |
partial = '' | |
for source in rule.sources: | |
line = lines_by_source.get(source, None) | |
if line is None: | |
keyfile0 = ID_DIR / '{}-at-{}.ed25519.pub'.format(source.user, source.host) | |
keyfile1 = ID_DIR / '{}-at-{}.rsa.pub'.format(source.user, source.host) | |
if keyfile0.exists(): | |
keyfile = keyfile0 | |
elif keyfile1.exists(): | |
keyfile = keyfile1 | |
else: | |
raise ValueError('no key for {}@{}'.format(source.user, source.host)) | |
with open(keyfile, 'rt') as fp: | |
line = fp.readline().rstrip('\r\n') + '\n' | |
lines_by_source[source] = line | |
partial += line | |
for target in rule.targets: | |
content_by_target.setdefault(target, '') | |
content_by_target[target] += partial | |
for target, content in content_by_target.items(): | |
if args.only and (target.host not in args.only): | |
continue | |
installer_name = installers_by_host.get(target.host, None) | |
installer_name = installer_name or 'offline' | |
installer = INSTALLERS[installer_name] | |
with tempfile.TemporaryDirectory() as tmpdir: | |
tmpdir = pathlib.Path(tmpdir) | |
tmpfile = tmpdir / 'authorized_keys' | |
with open(tmpfile, 'wt') as fp: | |
os.fchown(fp.fileno(), target.pwd.pw_uid, target.pwd.pw_gid) | |
os.fchmod(fp.fileno(), 0o644) | |
fp.write(content) | |
installer(target, tmpfile) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment