Skip to content

Instantly share code, notes, and snippets.

@ChenyangGao
Created August 27, 2022 14:12
Show Gist options
  • Save ChenyangGao/df6be18ddd5f6ce14e387ee742150c83 to your computer and use it in GitHub Desktop.
Save ChenyangGao/df6be18ddd5f6ce14e387ee742150c83 to your computer and use it in GitHub Desktop.
在Linux主机间共享ssh公钥
#!/usr/bin/env python3
# coding: utf-8
__author__ = "ChenyangGao <https://chenyanggao.github.io>"
__version__ = (0, 1)
__all__ = ["login", "get_pubkey"]
# TODO: 可能需要检测 ssh 时是否远程服务器冲突,可通过运行如下命令修复
# ssh-keygen -R ${host}
from argparse import ArgumentParser
if __name__ == "__main__":
parser = ArgumentParser(description="ssh 公钥相互分发")
parser.add_argument(
"list", metavar="auth", nargs="*",
help="请按 [username]:password[@[host]][:[port]] "
"格式传入参数,不含外侧[和],[] 表示里面的内容可缺省")
parser.add_argument(
"-w", "--overwrite", action="store_true",
help="对于 ~/.ssh/authorized_keys 文件:是,覆写; 否(默认),追加"
)
parser.add_argument(
"-t", "--type", choices=("1", "n", "n+1"), default="n+1",
help="1: 把本地公钥发到远程机器;n: 远程机器互相添加各自公钥;n+1(默认): 既[1]又[n]",
)
args = parser.parse_args()
if not args.list:
parser.parse_args(["-h"])
try:
from pexpect import spawn, TIMEOUT
except ImportError:
raise SystemExit(f"""请安装模块 pexpect
'{__import__('sys').executable}' -m pip install pexpect""")
def login(
host: str = "localhost",
port: str = "22",
username: str = "",
password: str = "",
) -> spawn:
if username:
child = spawn(f"ssh {username}@{host} -p {port}")
else:
child = spawn(f"ssh {host} -p {port}")
expect_exact = child.expect_exact
try:
n = expect_exact(["Last login:", "No route to host", "(yes/no/[fingerprint])?", "password:"])
except TIMEOUT:
raise ValueError(f"Can't handle")
if n == 0:
return child
if n == 1:
raise ValueError("Can't login")
elif n == 2:
child.sendline("yes")
expect_exact("password:")
child.sendline(password)
try:
expect_exact("Last login:", 5)
except TIMEOUT:
raise ValueError("wrong password")
return child
def get_pubkey(session: spawn) -> bytes:
session.sendline("ls ~/.ssh/*.pub && echo '//😃' || echo '//😭'")
if session.expect_exact(["//😃".encode(), "//😭".encode()]) == 1:
cmd = "ssh-keygen -t rsa -N '' -f ~/.ssh/id_rsa -q"
session.sendline(cmd)
try:
session.expect_exact("Overwrite (y/n)?", 1)
except pexpect.TIMEOUT:
pass
else:
session.sendline("n")
cmd = "cat `ls ~/.ssh/*.pub | head -1`"
session.sendline(cmd)
session.expect_exact(cmd)
session.readline()
return session.readline().strip()
if __name__ == "__main__":
from os import popen
from warnings import warn
def parse_auth(
s: str,
sep_main: str = "@",
sep_sub: str = ":",
) -> dict:
username_password, _, host_port = s.partition(sep_main)
username, _, password = username_password.partition(sep_sub)
host, _, port = host_port.partition(sep_sub)
return {
"host": host or "localhost", "port": port or "22",
"username": username, "password": password,
}
def get_local_pubkey() -> bytes:
return popen("ssh-keygen -t rsa -N '' -f ~/.ssh/id_rsa -q -y")\
.read().strip().encode()
auths = args.list
overwrite = args.overwrite
write = b">" if overwrite else b">>"
type_ = args.type
sessions = []
pubkeys = []
if type_ in ("1", "n+1"):
pubkeys.append(get_local_pubkey())
for auth in auths:
d_auth = parse_auth(auth)
try:
session = login(**d_auth)
sessions.append((auth, session))
if type_ != "1":
pubkey = get_pubkey(session)
pubkeys.append(pubkey)
except Exception as exc:
warn(f"Read Error :: {auth} :: {exc!r}")
pubkeys = b"\n".join(pubkey for pubkey in pubkeys)
cmd = b"cat %s ~/.ssh/authorized_keys << EOF\n%s\nEOF\n" % (write, pubkeys)
for auth, session in sessions:
try:
if overwrite:
session.sendline("/usr/bin/env cp -f ~/.ssh/authorized_keys{,.old}")
session.send(cmd)
session.sendline("")
session.close()
except Exception as exc:
warn(f"Write Error :: {auth} :: {exc!r}")
else:
print(f"Success :: {auth}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment