Skip to content

Instantly share code, notes, and snippets.

@AstraLuma
Last active December 10, 2018 01:52
Show Gist options
  • Save AstraLuma/17bb49283c2e0f5895d3567b9a8ce00f to your computer and use it in GitHub Desktop.
Save AstraLuma/17bb49283c2e0f5895d3567b9a8ce00f to your computer and use it in GitHub Desktop.
Python Async+SSH

Proof of Concept of using Python+Async to connect to a server over SSH.

Requires:

  • Python >= 3.6
  • msgpack
  • tornado<5

Run client.py, assumes you're running ssh-server on localhost and that you can log in to it.

import asyncio
import sys
import msgpack
from pathlib import Path
class MsgPackProcProtocol(asyncio.SubprocessProtocol):
def __init__(self):
self._packer = msgpack.Packer(autoreset=True)
self._unpacker = msgpack.Unpacker(raw=False)
self.closed = asyncio.Future()
self.error_stream = sys.stderr.buffer
self._can_write = asyncio.Event()
self._can_write.clear()
def connection_made(self, transport):
print("connection_made", transport)
self._transport = transport
self._write_transport = self._transport.get_pipe_transport(0)
self._can_write.set()
def connection_lost(self, exc):
print("connection_lost", exc)
def pause_writing(self):
print("pause_writing")
self._can_write.clear()
def resume_writing(self):
print("resume_writing")
self._can_write.set()
def pipe_connection_lost(self, fd, exc):
print("pipe_connection_lost", fd, exc)
def pipe_data_received(self, fd, data):
print("pipe_data_received", fd, data)
if fd == 1:
self._unpacker.feed(data)
for msg in self._unpacker:
# TODO: Do something
print("\t", repr(msg))
elif fd == 2:
self.error_stream.write(data)
# Apply line flushing
if b'\n' in data:
self.error_stream.flush()
def process_exited(self):
print("process_exited")
self.closed.set_result(True)
# TODO: Flush the unpacker?
async def send_message(self, message):
"""
Send a message.
"""
print("send_message", repr(message))
data = self._packer.pack(message)
await self._can_write.wait()
print("\t", "Sending!")
self._write_transport.write(data)
class MsgPackConnection:
def __init__(self, protocol):
self._protocol = protocol
def closed(self):
return self._protocol.closed
async def send_message(self, message):
await self._protocol.send_message(message)
async def recv_message(self):
pass
async def connect_to_server():
loop = asyncio.get_event_loop()
target = Path(__file__).resolve().parent / 'server.sh'
transport, protocol = await loop.subprocess_exec(
lambda: MsgPackProcProtocol(),
'ssh', 'localhost', str(target),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
return MsgPackConnection(protocol)
async def main():
conn = await connect_to_server()
await conn.send_message({"Hello": "World"})
await conn.closed()
asyncio.get_event_loop().run_until_complete(main())
import sys
import os
import tornado.iostream
from tornado.ioloop import IOLoop
import msgpack
def make_stdio_raw():
"""
Claim stdin/stdout as raw binary streams, and get them out of the way of
anything that might try to use them as text streams.
Redirects stdout to stderr, and sets stdin to /dev/null.
Returns new tornado PipeIOStream for input and output
"""
# Dissassemble all the connections, so that accidental references don't cause problems
# Detach text io, returning buffer; detach buffer, returning raw
sys.stdin.detach().detach()
sys.stdout.detach().detach()
# Mock in substitutes, so things don't break.
sys.stdin = open(os.devnull, 'wt')
sys.stdout = sys.stderr
# Do the same thing on a fd level
# Get the old stdin/out out of the way
fdin = os.dup(0)
fdout = os.dup(1)
# Get the new stdin in place
os.dup2(sys.stdin.fileno(), 0)
os.dup2(sys.stderr.fileno(), 1)
# Take fdin/out (the true stdin/out) and make tornado objects out of them.
input_stream = tornado.iostream.PipeIOStream(fdin)
output_stream = tornado.iostream.PipeIOStream(fdout)
return input_stream, output_stream
BUF_SIZE = 4 * 1024
class MsgPackProcServer(object):
def __init__(self, recv_message):
"""
* recv_message: Callback for when a message is received, in the form:
recv_message(server, message)
"""
self._recv_message = recv_message
self._input, self._output = make_stdio_raw()
self._packer = msgpack.Packer(autoreset=True)
self._unpacker = msgpack.Unpacker(raw=False)
def listen(self):
self._input.set_close_callback(self._on_close)
self._output.set_close_callback(self._on_close)
self._input.read_until_close(streaming_callback=self._on_input)
def _on_input(self, data):
self._unpacker.feed(data)
for message in self._unpacker:
IOLoop.current().spawn_callback(self._recv_message, self, message)
def _on_close(self):
IOLoop.current().stop()
def send_message(self, message):
data = self._packer.pack(message)
return self._output.write(data)
if __name__ == "__main__":
server = MsgPackProcServer(lambda server, message: server.send_message(message))
print("running")
server.listen()
IOLoop.instance().start()
#!/bin/sh -e
cd $(dirname $0)
exec /home/astraluma/.virtualenvs/salt/bin/python ./server.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment