Skip to content

Instantly share code, notes, and snippets.

@rmja
Last active June 16, 2020 08:33
Show Gist options
  • Save rmja/8273fa0a629fcd01b1ff2f9309d26b56 to your computer and use it in GitHub Desktop.
Save rmja/8273fa0a629fcd01b1ff2f9309d26b56 to your computer and use it in GitHub Desktop.
authorized_keys maintenance for hcloud
#!/usr/bin/env python3
# hcloud-access.py
# Example: ./hcloud-access.py assign-key hcloud-ssh-key-name -u username server1 server2
import os
import requests
import sys
import argparse
import toml
import pwd
def get_servers(headers):
r = requests.get("https://api.hetzner.cloud/v1/servers", headers=headers)
return r.json()["servers"]
def get_ssh_key(headers, name):
r = requests.get("https://api.hetzner.cloud/v1/ssh_keys", params= { "name": name }, headers=headers)
keys = r.json()["ssh_keys"]
return keys[0] if len(keys) == 1 else None
def append_public_key(hostname, user, public_key):
cmd = "ssh {}@{} 'umask 0077; mkdir -p ~/.ssh; grep -q -F \"{}\" ~/.ssh/authorized_keys 2> /dev/null || echo \"{}\" >> ~/.ssh/authorized_keys'".format(user, hostname, public_key, public_key)
os.system(cmd)
def remove_public_key(hostname, user, public_key):
cmd = "ssh {}@{} 'umask 0077; mkdir -p ~/.ssh; grep -v -F \"{}\" ~/.ssh/authorized_keys 2> /dev/null > ~/.ssh/authorized_keys.tmp; mv ~/.ssh/authorized_keys.tmp ~/.ssh/authorized_keys'".format(user, hostname, public_key)
os.system(cmd)
def get_headers():
path = os.path.join(os.path.expanduser("~"), ".config", "hcloud", "cli.toml")
config = toml.load(path)
token = next(context["token"] for context in config["contexts"] if context["name"] == config["active_context"])
headers = {
"Authorization": "Bearer " + token
}
return headers
def assign_key():
parser = argparse.ArgumentParser("hcloud-access assign-key", description="add a public key to one or more servers")
parser.add_argument("keyname", help="the name of the key stored in hcloud")
parser.add_argument("--user", "-u", help="the user on the servers to which the key is assigned")
parser.add_argument("servers", nargs=argparse.REMAINDER, help="the name of the servers to which the key is assigned")
args = parser.parse_args(sys.argv[2:])
if len(args.servers) == 0:
print("at least one server must be specified")
parser.print_help()
exit(1)
headers = get_headers()
key = get_ssh_key(headers, args.keyname)
if not key:
print("unable to find key '{}'".format(args.keyname))
exit(1)
all_servers = get_servers(headers)
servers = [server for server in all_servers if server["name"] in args.servers]
if len(servers) != len(args.servers):
all_server_names = [server["name"] for server in all_servers]
for server in args.servers:
if server not in all_server_names:
print("uanble to find server '{}'".format(server))
exit(1)
user = args.user if args.user else pwd.getpwuid(os.getuid()).pw_name
print("assigning key {} to user {} on the servers: {}".format(key["name"], user, ", ".join(args.servers)))
for server in servers:
ip = server["public_net"]["ipv4"]["ip"]
append_public_key(ip, user, key["public_key"])
def remove_key():
parser = argparse.ArgumentParser("hcloud-access remove-key", description="remove a public key from one or more servers")
parser.add_argument("keyname", help="the name of the key stored in hcloud")
parser.add_argument("--user", "-u", help="the user on the servers to which the key is assigned")
parser.add_argument("servers", nargs=argparse.REMAINDER, help="the name of the servers to which the key is assigned")
args = parser.parse_args(sys.argv[2:])
if len(args.servers) == 0:
print("at least one server must be specified")
parser.print_help()
exit(1)
headers = get_headers()
key = get_ssh_key(headers, args.keyname)
if not key:
print("unable to find key '{}'".format(args.keyname))
exit(1)
all_servers = get_servers(headers)
servers = [server for server in all_servers if server["name"] in args.servers]
if len(servers) != len(args.servers):
all_server_names = [server["name"] for server in all_servers]
for server in args.servers:
if server not in all_server_names:
print("uanble to find server '{}'".format(server))
exit(1)
user = args.user if args.user else pwd.getpwuid(os.getuid()).pw_name
print("removing key {} from user {} on the servers: {}".format(key["name"], user, ", ".join(args.servers)))
for server in servers:
ip = server["public_net"]["ipv4"]["ip"]
remove_public_key(ip, user, key["public_key"])
if __name__ == "__main__":
parser = argparse.ArgumentParser("hcloud-access", usage='''hcloud-access <command>
list of commands:
assign-key add a public key to one or more servers
''')
parser.add_argument("command", help="The command to run")
args = parser.parse_args(sys.argv[1:2])
commands = {
"assign-key": assign_key,
"remove-key": remove_key
}
if not args.command in commands:
print("unrecognized command '{}'".format(args.command))
parser.print_help()
exit(1)
commands[args.command]()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment