Skip to content

Instantly share code, notes, and snippets.

@xkortex
Last active March 6, 2021 01:20
Show Gist options
  • Save xkortex/9cb6237085aefea8ddf66e6c3c74c656 to your computer and use it in GitHub Desktop.
Save xkortex/9cb6237085aefea8ddf66e6c3c74c656 to your computer and use it in GitHub Desktop.
A server which runs subprocesses in response to HTTP requests and dumps stdout/stderr to terminal
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""WORK IN PROGRESS!! USE AT YOUR OWN RISK!!"""
import os
import sys
from typing import Dict, List, Optional
from urllib.parse import urlparse, ParseResult
from functools import partial
import asyncio
from loguru import logger
import pydantic
class ArgsRequest(pydantic.BaseModel):
"""Structure representing a request"""
path: str # path is the path of the command to run
args: List[str] = []
addr: str
message: Optional[str] = None
id: Optional[str] = None
bufsize: int = 4096
shell: bool = False
cwd: Optional[str] = None
env: Dict[str, str] = {}
timeout: Optional[float] = None
class ArgsResponse(pydantic.BaseModel):
"""Structure representing a response to ArgsRequest"""
status: int
message: Optional[str] = None
id: Optional[str] = None
class CliArgs(ArgsRequest):
path: Optional[str] = None
verb: str
addr: str
addrs: List[str] = None
allowed_commands: List[str] = None
message: Optional[str] = None
class Config:
extra = pydantic.Extra.ignore
def arg_parser():
import argparse
parser = argparse.ArgumentParser(
description="""Subprocess as-a-service. Very basic, only mirrors output to stdout/stderr"""
)
parser.add_argument(
"-a",
"--addr",
default='http://localhost:7701',
action="store",
type=str,
help="Main address to listen/talk on ",
)
parser.add_argument(
"-A", "--addrs", default=[], action="append", help="list multiple addresses "
)
subparsers = parser.add_subparsers(help="sub-commands")
parser_c = subparsers.add_parser("c", help="client")
parser_c.set_defaults(verb="client")
parser_c.add_argument(
"path", type=str, help="Path of the command to be issued"
)
parser_c.add_argument(
"args", nargs="*", type=str, default=[], help="Arguments to be passed to the command"
)
parser_c.add_argument(
"-m",
"--message",
default="Hello, world!",
action="store",
type=str,
help="message to send",
)
parser_s = subparsers.add_parser("s", help="server")
parser_s.set_defaults(verb="server")
parser_s.add_argument(
"allowed_commands",
nargs="*",
type=str,
help="Only run commands that match one of these filters",
)
return parser
def get_server_func(addr: str):
serve_pars: ParseResult = urlparse(addr)
if serve_pars.scheme == "unix":
return partial(asyncio.start_unix_server, path=serve_pars.path)
elif serve_pars.port and serve_pars.hostname:
return partial(
asyncio.start_server, host=serve_pars.hostname, port=serve_pars.port
)
else:
raise ValueError(f"cannot parse URI addr: {addr}")
def get_connection_func(addr):
pars = urlparse(addr)
logger.debug(pars)
if pars.scheme == "unix":
return partial(asyncio.open_unix_connection, path=pars.path)
elif pars.port and pars.hostname:
return partial(asyncio.open_connection, host=pars.hostname, port=pars.port)
else:
raise ValueError(f"cannot parse URI addr: {addr}")
async def tcp_subproc_client(args: ArgsRequest):
logger.info(args)
message = args.json()
addr = args.addr
open_connection = get_connection_func(addr)
reader, writer = await open_connection()
logger.info(f"Send: {message!r}")
writer.write(message.encode())
await writer.drain()
data = await reader.read(args.bufsize)
response = data.decode()
ok = response == message
logger.info(f"Received {ok}: {data.decode()!r}")
logger.debug("Close the connection")
writer.close()
await writer.wait_closed()
class SubprocRunner(object):
def __init__(self, args: CliArgs):
self.args = args
async def handle_run_subproc(self, reader, writer):
data = await reader.read(self.args.bufsize)
payload = ArgsRequest.parse_raw(data)
try:
addr = writer.get_extra_info("peername")
except Exception as exc:
addr = "{}: {}".format(exc.__class__.__name__, exc)
logger.info(f"Received {payload!r} from {addr!r}")
if payload.path not in self.args.allowed_commands:
resp = ArgsResponse(status=405, message=f"'{payload.path}' is a forbidden command")
writer.write(resp.json().encode())
await writer.drain()
writer.close()
return
# for simplicity, we are just gonna wait for the response
await self.run_subprocess(payload)
resp = ArgsResponse(status=200, message=f"'{payload.path}' success")
data = resp.json().encode()
logger.debug(f"Send: {data!r}")
writer.write(data)
await writer.drain()
logger.debug("Close the connection")
writer.close()
async def run_subprocess(self, cmd: ArgsRequest):
logger.info("start: \n{} {}".format(cmd, " ".join(cmd.args)))
process = await asyncio.create_subprocess_exec(
cmd.path,
*cmd.args,
)
return await process.wait()
async def spawn_server(addr, args: CliArgs):
start_server = get_server_func(addr)
runner = SubprocRunner(args)
server = await start_server(runner.handle_run_subproc)
myaddr = server.sockets[0].getsockname()
logger.info(f"Serving on {myaddr}")
async with server:
await server.serve_forever()
async def main_server(args: CliArgs):
addrs = args.addrs
if args.addr is not None:
addrs.append(args.addr)
loop = asyncio.get_event_loop()
futures = [loop.create_task(spawn_server(addr, args)) for addr in addrs]
return futures
def main():
parser = arg_parser()
args = parser.parse_args()
args = CliArgs(**vars(args))
logger.info(args)
verb = args.verb
if verb == "client":
c_args = ArgsRequest.parse_obj(args)
asyncio.run(tcp_subproc_client(c_args))
return
loop = asyncio.get_event_loop()
if verb == "server":
future = loop.create_task(main_server(args))
logger.debug(f"created {future}")
else:
raise ValueError(f"unknown command: {verb}")
loop.run_forever()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment