Skip to content

Instantly share code, notes, and snippets.

@kissgyorgy
Last active December 24, 2023 16:35
Show Gist options
  • Save kissgyorgy/9e58881131aeea51ed0a2c8bbffbdba4 to your computer and use it in GitHub Desktop.
Save kissgyorgy/9e58881131aeea51ed0a2c8bbffbdba4 to your computer and use it in GitHub Desktop.
Python: HTTP over SSH
import asyncio
import sys
from typing import Optional
import asyncssh
import httpx
class MySSHTCPSession(asyncssh.SSHTCPSession):
def __init__(self):
self._buffer = []
def data_received(self, data: bytes, datatype: asyncssh.DataType) -> None:
self._buffer.append(data)
def connection_lost(self, exc: Optional[Exception]) -> None:
if exc:
print("Direct connection error:", str(exc), file=sys.stderr)
class HelloWorldTransport(httpx.AsyncBaseTransport):
"""
A mock transport that always returns a JSON "Hello, world!" response.
"""
def __init__(self):
self._conn = None
async def connect(self):
self._conn = await asyncssh.connect("localhost")
await self._conn.__aenter__()
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
if self._conn is None:
await self.connect()
chan, session = await self._conn.create_connection(
MySSHTCPSession, request.url.host, 80
)
chan.write(b"HEAD / HTTP/1.0\r\n\r\n")
chan.write_eof()
await chan.wait_closed()
response = b"".join(session._buffer)
lines = response.splitlines()
status_code = int(lines[0].split(b" ")[1])
headers = {}
index = None
for index, line in enumerate(lines[1:], start=1):
if line == b"":
break
# breakpoint()
header, value = line.split(b": ", 1)
headers[header] = value.strip()
body = b"".join(lines[index + 1 :]) if index is not None else b""
stream = httpx.ByteStream(body)
return httpx.Response(status_code, headers=headers, stream=stream)
async def run_client(url: str | httpx.URL) -> httpx.Response:
client = httpx.AsyncClient(transport=HelloWorldTransport())
response = await client.get(url)
await client.aclose()
return response
request_coro = run_client("http://www.google.com")
response = asyncio.run(request_coro)
print("STATUS: ", response.status_code)
print("HEADERS: ", response.headers)
print("BODY: ", response.text)
import asyncio
import sys
from typing import Optional
import asyncssh
import httpx
class SSHSession(asyncssh.SSHClientSession):
def __init__(self):
self.buffer = []
def data_received(self, data: bytes, datatype: asyncssh.DataType) -> None:
self.buffer.append(data)
def connection_lost(self, exc: Optional[Exception]) -> None:
if exc:
print("Direct connection error:", str(exc), file=sys.stderr)
class SSHTransport(httpx.AsyncBaseTransport):
def __init__(self):
self._conn = None
async def connect(self):
self._conn = await asyncssh.connect(
"localhost", 8022, username="walkman", password="blabla"
)
await self._conn.__aenter__()
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
if self._conn is None:
await self.connect()
assert self._conn is not None
chan, session = await self._conn.create_session(SSHSession)
chan.write(f"{request.method} {request.url.path} HTTP/1.0\r\n\r\n")
chan.write_eof()
await chan.wait_closed()
response = "".join(session.buffer)
lines = response.splitlines()
status_code = int(lines[0].split(" ")[1])
headers = {}
index = None
for index, line in enumerate(lines[1:], start=1):
if not line:
break
header, value = line.split(": ", 1)
headers[header] = value.strip()
body = "".join(lines[index + 1 :]) if index is not None else ""
return httpx.Response(status_code, headers=headers, content=body)
async def run_client(url: str | httpx.URL) -> httpx.Response:
client = httpx.AsyncClient(transport=SSHTransport(), base_url="http://testserver")
response = await client.get(url)
await client.aclose()
return response
request_coro = run_client("/")
response = asyncio.run(request_coro)
print("STATUS:", response.status_code)
print("HEADERS:", response.headers)
print("BODY:", repr(response.text))
import asyncio
import asyncssh
from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello_world():
return "<p>Hello, World!</p>"
async def wsgi_response(process: asyncssh.SSHServerProcess) -> None:
write = process.stdout.write
def start_response(status, headers):
write(f"HTTP/1.0 {status}\r\n")
for name, value in headers:
write(f"{name}: {value}\r\n")
write("\r\n")
request_line = await process.stdin.readline()
method, path, version = request_line.split(" ", 2)
print("REQUEST:", method, path, version)
env = {
"REQUEST_METHOD": method,
"PATH_INFO": path,
"SERVER_PROTOCOL": version,
"wsgi.url_scheme": "http",
}
try:
body = app(env, start_response)
except Exception as e:
print("Exception:", type(e), e)
process.exit(1)
else:
for chunk in body:
write(chunk.decode())
process.exit(0)
class NoAuthSSHServer(asyncssh.SSHServer):
def begin_auth(self, username: str) -> bool:
print("begin_auth", username)
# Immediately accept any username
return False
async def start_server() -> None:
await asyncssh.create_server(
NoAuthSSHServer,
"",
8022,
server_host_keys=["ssh_host_ed25519_key"],
process_factory=wsgi_response,
)
loop = asyncio.get_event_loop()
server_coro = start_server()
loop.run_until_complete(server_coro)
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment