|
#!/usr/bin/python3 |
|
|
|
import argparse |
|
import json |
|
import grp |
|
import logging |
|
import os |
|
import pwd |
|
import subprocess |
|
import sys |
|
from urllib import request |
|
|
|
|
|
def load_json_url(url): |
|
return json.loads(load_url(url).decode('utf-8')) |
|
|
|
|
|
def load_url(url): |
|
r = request.urlopen(url) |
|
if r.reason != 'OK': |
|
raise RuntimeError("Unable to load %s" % url) |
|
return r.read() |
|
|
|
|
|
def get_launchpad_user(uname): |
|
burl = 'https://api.launchpad.net/1.0/~%s' % uname |
|
udata = load_json_url(burl) |
|
keydata = load_json_url(udata['sshkeys_collection_link']) |
|
keys = [] |
|
for i in keydata['entries']: |
|
keys.append(' '.join(["ssh-" + i['keytype'].lower(), |
|
i['keytext'], i['comment']])) |
|
return {'username': uname, 'name': udata['display_name'], 'keys': keys} |
|
|
|
|
|
def get_github_user(uname): |
|
# https://api.github.com/users/smoser/keys |
|
# https://api.github.com/users/smoser |
|
burl = 'https://api.github.com/users/' + uname |
|
udata = load_json_url(burl) |
|
keydata = load_json_url(burl + "/keys") |
|
keys = [] |
|
for i in keydata: |
|
keys.append(i["key"] + " " + uname + "@github%d" % i["id"]) |
|
return {'username': uname, 'name': udata['name'], 'keys': keys} |
|
|
|
|
|
def execute(cmd, dry): |
|
logging.info("execute: " + ' '.join(cmd)) |
|
if dry: |
|
return 0 |
|
|
|
try: |
|
subprocess.check_output(cmd, stderr=subprocess.STDOUT) |
|
except subprocess.CalledProcessError as e: |
|
logging.error("Command failed [%d]: %s", e.returncode, cmd) |
|
logging.error(e.output.decode()) |
|
raise e |
|
|
|
return 0 |
|
|
|
|
|
def is_group(name): |
|
try: |
|
if grp.getgrnam(name): |
|
return True |
|
except KeyError: |
|
return False |
|
|
|
|
|
def is_user(name): |
|
try: |
|
if pwd.getpwnam(name): |
|
return True |
|
except KeyError: |
|
return False |
|
|
|
|
|
def auth_keys_file(user): |
|
d = os.path.expanduser("~" + user) |
|
if not is_user(user): |
|
d = "/home/%s" % user |
|
return os.path.join(d, ".ssh", "authorized_keys") |
|
|
|
|
|
def ensure_group(group, dry=False): |
|
if is_group(group): |
|
return |
|
execute(['groupadd', group], dry=dry) |
|
|
|
|
|
def add_user(user, dry=False, groups=None, name=None): |
|
if groups is None: |
|
groups = ['adm', 'audio', 'cdrom', 'dialout', 'dip', 'floppy', 'kvm', |
|
'lxd', 'netdev', 'plugdev', 'sudo', 'video'] |
|
|
|
for group in groups: |
|
ensure_group(group, dry) |
|
|
|
if not name: |
|
name = user |
|
|
|
execute(['useradd', user, '--comment=%s' % name, |
|
'--groups=%s' % ','.join(groups), |
|
'--shell=/bin/bash', '--create-home'], dry) |
|
|
|
|
|
def add_sudo(user, dry=False): |
|
sudoers_d = "/etc/sudoers.d" |
|
sudoers_file = os.path.join(sudoers_d, user + "-user") |
|
sudoers_line = user + " ALL=(ALL) NOPASSWD:ALL\n" |
|
|
|
logging.info("write to '%s': %s", sudoers_file, sudoers_line.rstrip()) |
|
if dry: |
|
return |
|
|
|
orig_umask = os.umask(0o026) |
|
try: |
|
if not os.path.isdir(sudoers_d): |
|
os.mkdir(sudoers_d, 0o755) |
|
os.umask(0o0226) |
|
with open(sudoers_file, "w") as fp: |
|
fp.write(sudoers_line) |
|
finally: |
|
os.umask(orig_umask) |
|
|
|
|
|
def add_keys(user, keys=None, dry=False): |
|
if keys is None: |
|
keys = [] |
|
if not keys: |
|
return |
|
|
|
fpath = auth_keys_file(user) |
|
kdir = os.path.dirname(fpath) |
|
logging.info("Add %d keys to %s", len(keys), fpath) |
|
if dry: |
|
return |
|
|
|
content = '\n'.join([k.strip() for k in keys]) + "\n" |
|
|
|
uid = pwd.getpwnam(user).pw_uid |
|
gid = grp.getgrnam(user).gr_gid |
|
|
|
if not os.path.isdir(kdir): |
|
os.mkdir(kdir, 0o755) |
|
os.chown(kdir, uid, gid) |
|
|
|
orig_umask = os.umask(0o026) |
|
try: |
|
os.umask(0o066) |
|
with open(fpath, "a") as fp: |
|
fp.write(content) |
|
os.chown(fpath, uid, gid) |
|
finally: |
|
os.umask(orig_umask) |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser( |
|
description='Add a user with, keys from launchpad or github.') |
|
|
|
parser.add_argument('--dry-run', action='store_true', default=False, |
|
help='only report what would be done') |
|
parser.add_argument('--sudo', action='store_true', default=False, |
|
help='give user passwordless sudo') |
|
parser.add_argument('--verbose', '-v', action='count', default=0) |
|
parser.add_argument('user', help='the local username') |
|
parser.add_argument('ruser', nargs='?', default=None, |
|
help=('the launchpad username (default to lp:<user>).' |
|
' Format is lp:<user> or gh:<user> for github')) |
|
|
|
args = parser.parse_args() |
|
|
|
if args.dry_run: |
|
args.verbose += 1 |
|
|
|
level = (logging.ERROR, logging.INFO, logging.DEBUG)[min(args.verbose, 2)] |
|
logging.basicConfig(stream=sys.stderr, level=level) |
|
|
|
rsite = "lp" |
|
if ":" in args.user: |
|
rsite, _, args.user = args.user.partition(":") |
|
|
|
if args.ruser is None: |
|
args.ruser = args.user |
|
|
|
elif ":" in args.ruser: |
|
rsite, _, args.ruser = args.ruser.partition(":") |
|
|
|
if rsite not in LOOKUPS: |
|
logging.error("Error: site '%s' not known (%s)", |
|
rsite, LOOKUPS.keys()) |
|
sys.exit(1) |
|
user = LOOKUPS[rsite](args.ruser) |
|
|
|
add_user(args.user, dry=args.dry_run, name=user['name']) |
|
|
|
if args.sudo: |
|
add_sudo(args.user, args.dry_run) |
|
|
|
add_keys(args.user, keys=user['keys'], dry=args.dry_run) |
|
|
|
|
|
LOOKUPS = { |
|
'gh': get_github_user, |
|
'lp': get_launchpad_user, |
|
} |
|
|
|
|
|
if __name__ == '__main__': |
|
sys.exit(main()) |
|
|
|
# vi: ts=4 expandtab |