Skip to content

Instantly share code, notes, and snippets.

@AstraLuma
Created March 29, 2023 20:06
Show Gist options
  • Save AstraLuma/14a72a7918a345e5c8cf8bc9f9c2e1f2 to your computer and use it in GitHub Desktop.
Save AstraLuma/14a72a7918a345e5c8cf8bc9f9c2e1f2 to your computer and use it in GitHub Desktop.
Paramiko server based on socketserver
#!/usr/bin/env python3
import base64
import logging
import paramiko
from paramiko.agent import AgentServerProxy
from paramiko.sftp import SFTP_NO_SUCH_FILE
from paramikosocket import SocketServerInterface, run_server
user_key = paramiko.RSAKey(data=base64.decodebytes(
b"AAAAB3NzaC1yc2EAAAABIwAAAIEAyO4it3fHlmGZWJaGrfeHOVY7RWO3P9M7hp"
b"fAu7jJ2d7eothvfeuoRFtJwhUmZDluRdFyhFY/hFAh76PJKGAusIqIQKlkJxMC"
b"KDqIexkgHAfID/6mqvmnSJf0b5W8v5h2pI/stOSwTQ+pxVhwJ9ctYDhRSlF0iT"
b"UWT10hcuO4Ks8="
))
class EmptySFTPServer(paramiko.SFTPServerInterface):
"""
Just says that file doesn't exist for all operations.
"""
def list_folder(self, path: str):
path = self.canonicalize(path)
if path == '/':
return []
else:
return SFTP_NO_SUCH_FILE
def stat(self, path: str):
return SFTP_NO_SUCH_FILE
def lstat(self, path: str):
return SFTP_NO_SUCH_FILE
def open(self, path: str, flags: int, attr: paramiko.SFTPAttributes):
return SFTP_NO_SUCH_FILE
def remove(self, path: str):
return SFTP_NO_SUCH_FILE
def rename(self, oldpath: str, newpath: str):
return SFTP_NO_SUCH_FILE
def posix_rename(self, oldpath: str, newpath: str):
return SFTP_NO_SUCH_FILE
def mkdir(self, path: str, attr: paramiko.SFTPAttributes):
return SFTP_NO_SUCH_FILE
def rmdir(self, path: str):
return SFTP_NO_SUCH_FILE
def chattr(self, path: str, attr):
return SFTP_NO_SUCH_FILE
def symlink(self, target: str, path: str):
return SFTP_NO_SUCH_FILE
def readlink(self, path: str):
return SFTP_NO_SUCH_FILE
class BasicServer(SocketServerInterface):
host_keys = [paramiko.RSAKey(filename="test_rsa.key")]
agent: AgentServerProxy|None = None
def setup_transport(self, transport):
super().setup_transport(transport)
transport.set_subsystem_handler('sftp', paramiko.SFTPServer, EmptySFTPServer)
def get_banner(self):
return ("This is a banner\r\n", "en-US")
def get_allowed_auths(self, username):
print(f"get_allowed_auths {username=}")
return "password,publickey"
def check_auth_none(self, username):
print(f"check_auth_none {username=}")
return paramiko.AUTH_FAILED
def check_auth_password(self, username, password):
print(f"check_auth_password {username=} {password=}")
match (username, password):
case ("robey", "foo"):
return paramiko.AUTH_SUCCESSFUL
case _:
return paramiko.AUTH_FAILED
def check_auth_publickey(self, username, key):
global user_key
print(f"check_auth_publickey {username=} {key=}")
if username == "robey" and key == user_key:
return paramiko.AUTH_SUCCESSFUL
else:
return paramiko.AUTH_FAILED
def check_channel_request(self, kind, chanid):
print(f"check_channel_request {kind=} {chanid=}")
match kind:
case "session":
return paramiko.OPEN_SUCCEEDED
case _:
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
def check_channel_shell_request(self, channel):
print(f"check_channel_shell_request {channel=} ({id(channel)})")
self.spawn_worker(shell_worker, channel)
return True
def check_channel_pty_request(
self, channel, term, width, height, pixelwidth, pixelheight, modes,
):
print(f"check_channel_pty_request {channel=} {term=} {width=} {height=} {pixelwidth=} {pixelheight=} {modes=}")
return True
def check_port_forward_request(self, address, port):
print(f"check_port_forward_request {address=} {port=}")
return False
def check_global_request(self, kind, msg):
print(f"check_global_request {kind=} {msg=}")
return False
def check_channel_exec_request(self, channel, command):
print(f"check_channel_shell_request {channel=} {command=}")
return False
def check_channel_subsystem_request(self, channel, name):
print(f"check_channel_subsystem_request {channel=} {name=}")
return super().check_channel_subsystem_request(channel, name)
def check_channel_env_request(self, channel, name, value):
print(f"check_channel_env_request {channel=} {name=} {value=}")
def check_channel_forward_agent_request(self, channel):
print(f"check_channel_forward_agent_request {channel=} ({id(channel)})")
self.agent = AgentServerProxy(channel.transport)
def connect():
self.agent.connect()
print("check_channel_forward_agent_request:connect:", self.agent.get_keys())
self.spawn_worker(connect)
return True
def __del__(self):
if self.agent is not None:
self.agent.close()
def null_worker(chan):
import time
time.sleep(30)
def shell_worker(chan):
chan.send("\r\n\r\nWelcome to my dorky little BBS!\r\n\r\n")
chan.send(
"We are on fire all the time! Hooray! Candy corn for everyone!\r\n"
)
chan.send("Happy birthday to Robot Dave!\r\n\r\n")
chan.send("Username: ")
f = chan.makefile("rU")
username = f.readline().strip("\r\n")
chan.send("\r\nI don't like you, " + username + ".\r\n")
chan.close()
if __name__ == '__main__':
logging.basicConfig(level='DEBUG')
run_server(BasicServer, ('127.0.0.1', 2222))
"""
Marries paramiko and socketserver into a useful thing.
"""
import socketserver
import threading
import typing
import paramiko
class SocketServerInterface(paramiko.ServerInterface):
"""
Paramiko ServerInterface with affordances for working with socketserver.
"""
host_keys: list[paramiko.PKey]
def get_keys(self) -> typing.Iterable[paramiko.PKey]:
"""
Generate the public keys for this server.
"""
yield from self.host_keys
def setup_transport(self, transport: paramiko.Transport):
"""
Called when the transport is created.
Override to add things like subsystems.
"""
for key in self.get_keys():
transport.add_server_key(key)
def spawn_worker(self, worker: typing.Callable, *pargs) -> None:
"""
Spawns a worker thread. Suggest using for channel handlers.
"""
# This gets overridden when ParamikoHandler constructs the server
raise RuntimeError("spawn_worker() called by paramiko server not spawned by ParamikoHandler.")
class _Channels(list):
"""
Self-reaping list of active channels.
"""
# FIXME: Find other times to reap
def append(self, thread):
self.reap()
super().append(thread)
def pop(self, index=-1):
self.reap()
return super().pop(index)
def pop_all(self):
self[:], result = [], self[:]
return result
def reap(self):
self[:] = (chan for chan in self if not chan.closed)
class ParamikoHandler(socketserver.BaseRequestHandler):
server: socketserver.ThreadingTCPServer
ServerClass = SocketServerInterface
_channels: _Channels
paramiko_server: SocketServerInterface
transport: paramiko.Transport
@classmethod
def for_server(cls, server: type[SocketServerInterface]) -> type['ParamikoHandler']:
"""
Produces a subclass that uses the given paramiko server.
"""
return type(f"{server.__name__}_Handler", (cls,), {'ServerClass': server})
def spawn_worker(self, worker, *pargs):
t = threading.Thread(target=worker, args=pargs)
t.daemon = self.server.daemon_threads
self.server._threads.append(t)
t.start()
def setup(self):
# At least one ref has to be kept for a channel to be active,
# but the actual app logic won't start until later, so we keep one.
self._channels = _Channels()
self.paramiko_server = self.ServerClass()
self.paramiko_server.spawn_worker = self.spawn_worker
self.transport = paramiko.Transport(self.request)
self.paramiko_server.setup_transport(self.transport)
# paramiko.Transport.load_server_moduli()
def handle(self):
self.transport.start_server(server=self.paramiko_server)
while True:
chan = self.transport.accept(30) # Self-servicing timeout
if chan is not None:
self._channels.append(chan)
else:
self._channels.reap()
def finish(self):
self.transport.close()
self._channels.reap()
def run_server(server: type[SocketServerInterface], bind: tuple[str, int]) -> typing.Never:
"""
Run the given paramiko server on the given port.
Blocks forever.
"""
handler_class = ParamikoHandler.for_server(server)
with socketserver.ThreadingTCPServer(bind, handler_class) as tt_server:
tt_server.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment