Proof of Concept of using Python+Async to connect to a server over SSH.
Requires:
- Python >= 3.6
msgpack
tornado<5
Run client.py
, assumes you're running ssh-server on localhost and that you can log in to it.
import asyncio | |
import sys | |
import msgpack | |
from pathlib import Path | |
class MsgPackProcProtocol(asyncio.SubprocessProtocol): | |
def __init__(self): | |
self._packer = msgpack.Packer(autoreset=True) | |
self._unpacker = msgpack.Unpacker(raw=False) | |
self.closed = asyncio.Future() | |
self.error_stream = sys.stderr.buffer | |
self._can_write = asyncio.Event() | |
self._can_write.clear() | |
def connection_made(self, transport): | |
print("connection_made", transport) | |
self._transport = transport | |
self._write_transport = self._transport.get_pipe_transport(0) | |
self._can_write.set() | |
def connection_lost(self, exc): | |
print("connection_lost", exc) | |
def pause_writing(self): | |
print("pause_writing") | |
self._can_write.clear() | |
def resume_writing(self): | |
print("resume_writing") | |
self._can_write.set() | |
def pipe_connection_lost(self, fd, exc): | |
print("pipe_connection_lost", fd, exc) | |
def pipe_data_received(self, fd, data): | |
print("pipe_data_received", fd, data) | |
if fd == 1: | |
self._unpacker.feed(data) | |
for msg in self._unpacker: | |
# TODO: Do something | |
print("\t", repr(msg)) | |
elif fd == 2: | |
self.error_stream.write(data) | |
# Apply line flushing | |
if b'\n' in data: | |
self.error_stream.flush() | |
def process_exited(self): | |
print("process_exited") | |
self.closed.set_result(True) | |
# TODO: Flush the unpacker? | |
async def send_message(self, message): | |
""" | |
Send a message. | |
""" | |
print("send_message", repr(message)) | |
data = self._packer.pack(message) | |
await self._can_write.wait() | |
print("\t", "Sending!") | |
self._write_transport.write(data) | |
class MsgPackConnection: | |
def __init__(self, protocol): | |
self._protocol = protocol | |
def closed(self): | |
return self._protocol.closed | |
async def send_message(self, message): | |
await self._protocol.send_message(message) | |
async def recv_message(self): | |
pass | |
async def connect_to_server(): | |
loop = asyncio.get_event_loop() | |
target = Path(__file__).resolve().parent / 'server.sh' | |
transport, protocol = await loop.subprocess_exec( | |
lambda: MsgPackProcProtocol(), | |
'ssh', 'localhost', str(target), | |
stdin=asyncio.subprocess.PIPE, | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE, | |
) | |
return MsgPackConnection(protocol) | |
async def main(): | |
conn = await connect_to_server() | |
await conn.send_message({"Hello": "World"}) | |
await conn.closed() | |
asyncio.get_event_loop().run_until_complete(main()) |
import sys | |
import os | |
import tornado.iostream | |
from tornado.ioloop import IOLoop | |
import msgpack | |
def make_stdio_raw(): | |
""" | |
Claim stdin/stdout as raw binary streams, and get them out of the way of | |
anything that might try to use them as text streams. | |
Redirects stdout to stderr, and sets stdin to /dev/null. | |
Returns new tornado PipeIOStream for input and output | |
""" | |
# Dissassemble all the connections, so that accidental references don't cause problems | |
# Detach text io, returning buffer; detach buffer, returning raw | |
sys.stdin.detach().detach() | |
sys.stdout.detach().detach() | |
# Mock in substitutes, so things don't break. | |
sys.stdin = open(os.devnull, 'wt') | |
sys.stdout = sys.stderr | |
# Do the same thing on a fd level | |
# Get the old stdin/out out of the way | |
fdin = os.dup(0) | |
fdout = os.dup(1) | |
# Get the new stdin in place | |
os.dup2(sys.stdin.fileno(), 0) | |
os.dup2(sys.stderr.fileno(), 1) | |
# Take fdin/out (the true stdin/out) and make tornado objects out of them. | |
input_stream = tornado.iostream.PipeIOStream(fdin) | |
output_stream = tornado.iostream.PipeIOStream(fdout) | |
return input_stream, output_stream | |
BUF_SIZE = 4 * 1024 | |
class MsgPackProcServer(object): | |
def __init__(self, recv_message): | |
""" | |
* recv_message: Callback for when a message is received, in the form: | |
recv_message(server, message) | |
""" | |
self._recv_message = recv_message | |
self._input, self._output = make_stdio_raw() | |
self._packer = msgpack.Packer(autoreset=True) | |
self._unpacker = msgpack.Unpacker(raw=False) | |
def listen(self): | |
self._input.set_close_callback(self._on_close) | |
self._output.set_close_callback(self._on_close) | |
self._input.read_until_close(streaming_callback=self._on_input) | |
def _on_input(self, data): | |
self._unpacker.feed(data) | |
for message in self._unpacker: | |
IOLoop.current().spawn_callback(self._recv_message, self, message) | |
def _on_close(self): | |
IOLoop.current().stop() | |
def send_message(self, message): | |
data = self._packer.pack(message) | |
return self._output.write(data) | |
if __name__ == "__main__": | |
server = MsgPackProcServer(lambda server, message: server.send_message(message)) | |
print("running") | |
server.listen() | |
IOLoop.instance().start() |
#!/bin/sh -e | |
cd $(dirname $0) | |
exec /home/astraluma/.virtualenvs/salt/bin/python ./server.py |