Skip to content

Instantly share code, notes, and snippets.

@nat-n
Last active November 11, 2019 17:29
Show Gist options
  • Save nat-n/4db690e10eb833dd654bdae2c0ca22d0 to your computer and use it in GitHub Desktop.
Save nat-n/4db690e10eb833dd654bdae2c0ca22d0 to your computer and use it in GitHub Desktop.
A proof of concept for using asyncio to manage and dynamically interact with a subprocess via stdio.
import asyncio
class Worker:
def __init__(self, cmd, limit=None, separator=b"\n"):
self.cmd = cmd
self.limit = limit
self.separator = separator
async def send(self, data):
self._proc.stdin.write(data)
await self._proc.stdin.drain()
async def start(self, on_stdout=None, on_stderr=None):
if on_stdout is None and hasattr(self, "_handle_stdout"):
on_stdout = self._handle_stdout
if on_stderr is None and hasattr(self, "_handle_stderr"):
on_stderr = self._handle_stderr
proc_kwargs = {"stdin": asyncio.subprocess.PIPE}
if on_stdout:
proc_kwargs["stdout"] = asyncio.subprocess.PIPE
if on_stderr:
proc_kwargs["stderr"] = asyncio.subprocess.PIPE
if self.limit:
proc_kwargs["limit"] = self.limit
self._proc = await asyncio.create_subprocess_exec(*self.cmd, **proc_kwargs)
if on_stdout:
self._stdout_task = asyncio.get_running_loop().create_task(
self._buffer_stream(self._proc.stdout, on_stdout)
)
if on_stderr:
self._stderr_task = asyncio.get_running_loop().create_task(
self._buffer_stream(self._proc.stderr, on_stderr)
)
async def _buffer_stream(self, stream, callback):
while self._proc.returncode is None:
try:
if isinstance(self.separator, bytes):
data = await stream.readuntil(separator=self.separator)
else:
data = await stream.readexactly(self.separator)
except asyncio.IncompleteReadError as error:
data = error.partial
if data:
await callback(data)
return
await callback(data)
async def stop(self, grace_period=1):
self._proc.terminate()
if hasattr(self, "_stdout_task"):
self._stdout_task.cancel()
if hasattr(self, "_stderr_task"):
self._stderr_task.cancel()
await asyncio.sleep(grace_period)
if self._proc.returncode is None:
self._proc.kill()
class Lancelot(Worker):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.answers = (
"My name is 'Sir Lancelot of Camelot'.\n",
"To seek the Holy Grail.\n",
"Blue.\n",
)
self.question_number = 0
async def _handle_stdout(self, data):
print("OUT:", data.decode())
if len(self.answers) > self.question_number:
await self.send(self.answers[self.question_number].encode())
self.question_number += 1
async def _handle_stderr(self, data):
print("ERR:", data.decode())
async def main():
# Configure a new worker
knight = Lancelot(("python", "./bridgekeeper.py"), separator=b"?")
# Start the subprocess in this worker
await knight.start()
# Gotta keep the main coro alive to give the worker time to work
for _ in range(100):
if knight._proc.returncode is not None:
print("returncode:", knight._proc.returncode)
break
await asyncio.sleep(0.01)
asyncio.get_event_loop().run_until_complete(main())
name = input("What... is your name?")
quest = input("What... is your quest?")
color = input("What... is your favorite color?")
raise Exception(f"You answered: {name!r}, {quest!r}, {color!r}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment