Last active
June 16, 2020 08:33
-
-
Save rmja/8273fa0a629fcd01b1ff2f9309d26b56 to your computer and use it in GitHub Desktop.
authorized_keys maintenance for hcloud
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# hcloud-access.py | |
# Example: ./hcloud-access.py assign-key hcloud-ssh-key-name -u username server1 server2 | |
import os | |
import requests | |
import sys | |
import argparse | |
import toml | |
import pwd | |
def get_servers(headers): | |
r = requests.get("https://api.hetzner.cloud/v1/servers", headers=headers) | |
return r.json()["servers"] | |
def get_ssh_key(headers, name): | |
r = requests.get("https://api.hetzner.cloud/v1/ssh_keys", params= { "name": name }, headers=headers) | |
keys = r.json()["ssh_keys"] | |
return keys[0] if len(keys) == 1 else None | |
def append_public_key(hostname, user, public_key): | |
cmd = "ssh {}@{} 'umask 0077; mkdir -p ~/.ssh; grep -q -F \"{}\" ~/.ssh/authorized_keys 2> /dev/null || echo \"{}\" >> ~/.ssh/authorized_keys'".format(user, hostname, public_key, public_key) | |
os.system(cmd) | |
def remove_public_key(hostname, user, public_key): | |
cmd = "ssh {}@{} 'umask 0077; mkdir -p ~/.ssh; grep -v -F \"{}\" ~/.ssh/authorized_keys 2> /dev/null > ~/.ssh/authorized_keys.tmp; mv ~/.ssh/authorized_keys.tmp ~/.ssh/authorized_keys'".format(user, hostname, public_key) | |
os.system(cmd) | |
def get_headers(): | |
path = os.path.join(os.path.expanduser("~"), ".config", "hcloud", "cli.toml") | |
config = toml.load(path) | |
token = next(context["token"] for context in config["contexts"] if context["name"] == config["active_context"]) | |
headers = { | |
"Authorization": "Bearer " + token | |
} | |
return headers | |
def assign_key(): | |
parser = argparse.ArgumentParser("hcloud-access assign-key", description="add a public key to one or more servers") | |
parser.add_argument("keyname", help="the name of the key stored in hcloud") | |
parser.add_argument("--user", "-u", help="the user on the servers to which the key is assigned") | |
parser.add_argument("servers", nargs=argparse.REMAINDER, help="the name of the servers to which the key is assigned") | |
args = parser.parse_args(sys.argv[2:]) | |
if len(args.servers) == 0: | |
print("at least one server must be specified") | |
parser.print_help() | |
exit(1) | |
headers = get_headers() | |
key = get_ssh_key(headers, args.keyname) | |
if not key: | |
print("unable to find key '{}'".format(args.keyname)) | |
exit(1) | |
all_servers = get_servers(headers) | |
servers = [server for server in all_servers if server["name"] in args.servers] | |
if len(servers) != len(args.servers): | |
all_server_names = [server["name"] for server in all_servers] | |
for server in args.servers: | |
if server not in all_server_names: | |
print("uanble to find server '{}'".format(server)) | |
exit(1) | |
user = args.user if args.user else pwd.getpwuid(os.getuid()).pw_name | |
print("assigning key {} to user {} on the servers: {}".format(key["name"], user, ", ".join(args.servers))) | |
for server in servers: | |
ip = server["public_net"]["ipv4"]["ip"] | |
append_public_key(ip, user, key["public_key"]) | |
def remove_key(): | |
parser = argparse.ArgumentParser("hcloud-access remove-key", description="remove a public key from one or more servers") | |
parser.add_argument("keyname", help="the name of the key stored in hcloud") | |
parser.add_argument("--user", "-u", help="the user on the servers to which the key is assigned") | |
parser.add_argument("servers", nargs=argparse.REMAINDER, help="the name of the servers to which the key is assigned") | |
args = parser.parse_args(sys.argv[2:]) | |
if len(args.servers) == 0: | |
print("at least one server must be specified") | |
parser.print_help() | |
exit(1) | |
headers = get_headers() | |
key = get_ssh_key(headers, args.keyname) | |
if not key: | |
print("unable to find key '{}'".format(args.keyname)) | |
exit(1) | |
all_servers = get_servers(headers) | |
servers = [server for server in all_servers if server["name"] in args.servers] | |
if len(servers) != len(args.servers): | |
all_server_names = [server["name"] for server in all_servers] | |
for server in args.servers: | |
if server not in all_server_names: | |
print("uanble to find server '{}'".format(server)) | |
exit(1) | |
user = args.user if args.user else pwd.getpwuid(os.getuid()).pw_name | |
print("removing key {} from user {} on the servers: {}".format(key["name"], user, ", ".join(args.servers))) | |
for server in servers: | |
ip = server["public_net"]["ipv4"]["ip"] | |
remove_public_key(ip, user, key["public_key"]) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser("hcloud-access", usage='''hcloud-access <command> | |
list of commands: | |
assign-key add a public key to one or more servers | |
''') | |
parser.add_argument("command", help="The command to run") | |
args = parser.parse_args(sys.argv[1:2]) | |
commands = { | |
"assign-key": assign_key, | |
"remove-key": remove_key | |
} | |
if not args.command in commands: | |
print("unrecognized command '{}'".format(args.command)) | |
parser.print_help() | |
exit(1) | |
commands[args.command]() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment