Skip to content

Instantly share code, notes, and snippets.

@Eugeny
Last active April 16, 2024 02:56
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Eugeny/56765a5ef3632f1da7d47dbafa621b8d to your computer and use it in GitHub Desktop.
Save Eugeny/56765a5ef3632f1da7d47dbafa621b8d to your computer and use it in GitHub Desktop.
Full-on SSH clone in asyncio

An "SSH" clone in Python asyncio

This is a demo on how to handle remote process execution in asyncio

Obviously don't use in production as there's no auth.

  • Supports starting processes and killing them
  • Streams stdin/out/err
  • Optional PTY mode with resizing support

How to use

  • Server binds on 127.0.0.1:8888 by default
  • python server.py
  • python client.py bash
#!/usr/bin/env python3
import asyncio
import base64
import fcntl
import json
import os
import sys
import struct
import signal
import tty
import termios
loop = asyncio.new_event_loop()
class Client:
async def connect(self):
self.reader, self.writer = await asyncio.open_connection('127.0.0.1', 8888)
self._reader_task = loop.create_task(self._run_reader())
loop.add_reader(0, self._pump_stdin)
if sys.stdout.isatty():
signal.signal(signal.SIGWINCH, self._on_sigwinch)
async def _run_reader(self):
while True:
try:
data = await self.reader.readuntil(b'\0')
except asyncio.streams.IncompleteReadError:
return
message = json.loads(data.strip(b'\0').decode())
self.handle_message(message)
def handle_message(self, message):
# print('message', message)
if message['type'] == 'data':
fd = {
'stdout': 1,
'stderr': 2,
}[message['stream']]
data = base64.b64decode(message['data'])
os.write(fd, data)
if message['type'] == 'exit':
sys.exit(message['code'])
async def send(self, message):
self.writer.write(json.dumps(message).encode() + b'\0')
await self.writer.drain()
async def run_until_exit(self):
await self._reader_task
async def close(self):
signal.signal(signal.SIGWINCH, signal.SIG_IGN)
try:
await self.send({'type': 'kill'})
except BrokenPipeError:
pass
self.writer.close()
await self.writer.wait_closed()
async def start(self, command):
message = {
'type': 'start',
'pty': False,
'term': os.environ.get('TERM', 'xterm'),
'command': sys.argv[1],
}
if sys.stdout.isatty():
message['pty'] = True
rows, cols = self._get_pty_size()
message['rows'] = rows
message['cols'] = cols
await self.send(message)
async def _update_pty_size(self):
rows, cols = self._get_pty_size()
await self.send({
'type': 'resize-pty',
'rows': rows,
'cols': cols,
})
def _get_pty_size(self):
return struct.unpack('hh', fcntl.ioctl(1, termios.TIOCGWINSZ, '1234'))[:2]
def _on_sigwinch(self, _sig, _stack):
loop.create_task(self._update_pty_size())
def _pump_stdin(self):
data = os.read(0, 1024)
if not data:
loop.create_task(self.send({
'type': 'close-stream',
'stream': 'stdin',
}))
else:
loop.create_task(self.send({
'type': 'data',
'stream': 'stdin',
'data': base64.b64encode(data).decode(),
}))
async def main():
tattr = termios.tcgetattr(0)
try:
tty.setcbreak(0, termios.TCSANOW)
await c.connect()
await c.start('ls')
await c.run_until_exit()
finally:
termios.tcsetattr(0, termios.TCSANOW, tattr)
c = Client()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
loop.run_until_complete(c.close())
#!/usr/bin/env python3
import asyncio
import base64
import json
import fcntl
import termios
import pty
import sys
import os
import struct
import threading
loop = asyncio.new_event_loop()
class Session:
def __init__(self, reader, writer):
self.reader = reader
self.writer = writer
self.pid = None
loop.create_task(self._run_reader())
async def _run_reader(self):
while True:
try:
data = await self.reader.readuntil(b'\0')
except asyncio.streams.IncompleteReadError:
return
message = json.loads(data.strip(b'\0').decode())
await self.handle_message(message)
async def send(self, message):
self.writer.write(json.dumps(message).encode() + b'\0')
try:
await self.writer.drain()
except (BrokenPipeError, ConnectionResetError):
await self.close()
async def close(self):
if self.pid:
os.kill(self.pid, 9)
self.writer.close()
try:
await self.writer.wait_closed()
except BrokenPipeError:
pass
def _pump_fd_out(self, fin, stream):
data = os.read(fin, 1024)
if not data:
return
loop.create_task(self.send({
'type': 'data',
'stream': stream,
'data': base64.b64encode(data).decode(),
}))
async def handle_message(self, message):
if message['type'] == 'start':
stdin_child, self.stdin_master = os.pipe()
self.stdout_master, stdout_child = os.pipe()
self.stderr_master, stderr_child = os.pipe()
os.set_inheritable(self.stdin_master, True) # for pty resizing
os.set_inheritable(stdin_child, True)
os.set_inheritable(stdout_child, True)
os.set_inheritable(stderr_child, True)
if message['pty']:
self.pid, fd = pty.fork()
self.stdin_master = self.stdout_master = fd
else:
self.pid = os.fork()
if self.pid == 0:
if not message['pty']:
os.dup2(stdin_child, 0)
os.dup2(stdout_child, 1)
os.dup2(stderr_child, 2)
os.execve('/bin/bash', ['bash', '-c', message['command']], {**os.environ, 'TERM': message.get('term', 'xterm')})
else:
if message['pty']:
self._resize_pty(message['rows'], message['cols'])
os.close(stdin_child)
os.close(stdout_child)
os.close(stderr_child)
loop.add_reader(self.stdout_master, self._pump_fd_out, self.stdout_master, 'stdout')
loop.add_reader(self.stderr_master, self._pump_fd_out, self.stderr_master, 'stderr')
def wait():
_, exitcode = os.waitpid(self.pid, 0)
self.pid = None
loop.create_task(self._cleanup_process(exitcode))
threading.Thread(target=wait).start()
if message['type'] == 'close-stream':
if message['stream'] == 'stdin':
os.close(self.stdin_master)
if message['type'] == 'data':
if message['stream'] == 'stdin':
data = base64.b64decode(message['data'])
os.write(self.stdin_master, data)
if message['type'] == 'resize-pty':
self._resize_pty(message['rows'], message['cols'])
if message['type'] == 'kill':
if self.pid:
os.kill(self.pid, 9)
def _resize_pty(self, rows, cols):
winsize = struct.pack("HHHH", rows, cols, 0, 0)
fcntl.ioctl(self.stdin_master, termios.TIOCSWINSZ, winsize)
async def _cleanup_process(self, exitcode):
loop.remove_reader(self.stdout_master)
loop.remove_reader(self.stderr_master)
await self.send({
'type': 'exit',
'code': exitcode,
})
async def main():
server = await asyncio.start_server(Session, '127.0.0.1', 8888)
async with server:
await server.serve_forever()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment