Instantly share code, notes, and snippets.
Created
August 27, 2022 14:12
-
Star
(0)
0
You must be signed in to star a gist -
Fork
(0)
0
You must be signed in to fork a gist
-
Save ChenyangGao/df6be18ddd5f6ce14e387ee742150c83 to your computer and use it in GitHub Desktop.
在Linux主机间共享ssh公钥
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# coding: utf-8 | |
__author__ = "ChenyangGao <https://chenyanggao.github.io>" | |
__version__ = (0, 1) | |
__all__ = ["login", "get_pubkey"] | |
# TODO: 可能需要检测 ssh 时是否远程服务器冲突,可通过运行如下命令修复 | |
# ssh-keygen -R ${host} | |
from argparse import ArgumentParser | |
if __name__ == "__main__": | |
parser = ArgumentParser(description="ssh 公钥相互分发") | |
parser.add_argument( | |
"list", metavar="auth", nargs="*", | |
help="请按 [username]:password[@[host]][:[port]] " | |
"格式传入参数,不含外侧[和],[] 表示里面的内容可缺省") | |
parser.add_argument( | |
"-w", "--overwrite", action="store_true", | |
help="对于 ~/.ssh/authorized_keys 文件:是,覆写; 否(默认),追加" | |
) | |
parser.add_argument( | |
"-t", "--type", choices=("1", "n", "n+1"), default="n+1", | |
help="1: 把本地公钥发到远程机器;n: 远程机器互相添加各自公钥;n+1(默认): 既[1]又[n]", | |
) | |
args = parser.parse_args() | |
if not args.list: | |
parser.parse_args(["-h"]) | |
try: | |
from pexpect import spawn, TIMEOUT | |
except ImportError: | |
raise SystemExit(f"""请安装模块 pexpect | |
'{__import__('sys').executable}' -m pip install pexpect""") | |
def login( | |
host: str = "localhost", | |
port: str = "22", | |
username: str = "", | |
password: str = "", | |
) -> spawn: | |
if username: | |
child = spawn(f"ssh {username}@{host} -p {port}") | |
else: | |
child = spawn(f"ssh {host} -p {port}") | |
expect_exact = child.expect_exact | |
try: | |
n = expect_exact(["Last login:", "No route to host", "(yes/no/[fingerprint])?", "password:"]) | |
except TIMEOUT: | |
raise ValueError(f"Can't handle") | |
if n == 0: | |
return child | |
if n == 1: | |
raise ValueError("Can't login") | |
elif n == 2: | |
child.sendline("yes") | |
expect_exact("password:") | |
child.sendline(password) | |
try: | |
expect_exact("Last login:", 5) | |
except TIMEOUT: | |
raise ValueError("wrong password") | |
return child | |
def get_pubkey(session: spawn) -> bytes: | |
session.sendline("ls ~/.ssh/*.pub && echo '//😃' || echo '//😭'") | |
if session.expect_exact(["//😃".encode(), "//😭".encode()]) == 1: | |
cmd = "ssh-keygen -t rsa -N '' -f ~/.ssh/id_rsa -q" | |
session.sendline(cmd) | |
try: | |
session.expect_exact("Overwrite (y/n)?", 1) | |
except pexpect.TIMEOUT: | |
pass | |
else: | |
session.sendline("n") | |
cmd = "cat `ls ~/.ssh/*.pub | head -1`" | |
session.sendline(cmd) | |
session.expect_exact(cmd) | |
session.readline() | |
return session.readline().strip() | |
if __name__ == "__main__": | |
from os import popen | |
from warnings import warn | |
def parse_auth( | |
s: str, | |
sep_main: str = "@", | |
sep_sub: str = ":", | |
) -> dict: | |
username_password, _, host_port = s.partition(sep_main) | |
username, _, password = username_password.partition(sep_sub) | |
host, _, port = host_port.partition(sep_sub) | |
return { | |
"host": host or "localhost", "port": port or "22", | |
"username": username, "password": password, | |
} | |
def get_local_pubkey() -> bytes: | |
return popen("ssh-keygen -t rsa -N '' -f ~/.ssh/id_rsa -q -y")\ | |
.read().strip().encode() | |
auths = args.list | |
overwrite = args.overwrite | |
write = b">" if overwrite else b">>" | |
type_ = args.type | |
sessions = [] | |
pubkeys = [] | |
if type_ in ("1", "n+1"): | |
pubkeys.append(get_local_pubkey()) | |
for auth in auths: | |
d_auth = parse_auth(auth) | |
try: | |
session = login(**d_auth) | |
sessions.append((auth, session)) | |
if type_ != "1": | |
pubkey = get_pubkey(session) | |
pubkeys.append(pubkey) | |
except Exception as exc: | |
warn(f"Read Error :: {auth} :: {exc!r}") | |
pubkeys = b"\n".join(pubkey for pubkey in pubkeys) | |
cmd = b"cat %s ~/.ssh/authorized_keys << EOF\n%s\nEOF\n" % (write, pubkeys) | |
for auth, session in sessions: | |
try: | |
if overwrite: | |
session.sendline("/usr/bin/env cp -f ~/.ssh/authorized_keys{,.old}") | |
session.send(cmd) | |
session.sendline("") | |
session.close() | |
except Exception as exc: | |
warn(f"Write Error :: {auth} :: {exc!r}") | |
else: | |
print(f"Success :: {auth}") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment