Skip to content

Instantly share code, notes, and snippets.

@gh640
Created December 12, 2021 02:45
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gh640/50953484edfa846fda9a95374df57900 to your computer and use it in GitHub Desktop.
Save gh640/50953484edfa846fda9a95374df57900 to your computer and use it in GitHub Desktop.
Python: Stream output of `asyncio.create_subprocess_exec()`
"""Stream output of `asyncio.create_subprocess_exec()`"""
import asyncio
import sys
async def run(program: str, args: list[str]):
"""Capture output (stdout and stderr) while running external command."""
proc = await asyncio.create_subprocess_exec(
program, *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
while True:
if proc.stdout.at_eof() and proc.stderr.at_eof():
break
stdout = (await proc.stdout.readline()).decode()
if stdout:
print(f'[stdout] {stdout}', end='', flush=True)
stderr = (await proc.stderr.readline()).decode()
if stderr:
print(f'[sdterr] {stderr}', end='', flush=True, file=sys.stderr)
await asyncio.sleep(1)
await proc.communicate()
print(f'{program} {" ".join(args)} exited with {proc.returncode}')
async def run2(program: str, args: list[str]):
"""Run external command without capturing output (stdout and stderr)."""
proc = await asyncio.create_subprocess_exec(program, *args)
await proc.communicate()
class Runner:
"""Run external command with output capturing.
Usage:
runner = Runner(program, args, interval)
runner.start()
async for stdout, stderr in runner.stream():
...
returncode = runner.wait()
"""
def __init__(self, program, args, interval=1):
self.proc = None
self.program = program
self.args = args
self.interval = interval
async def start(self):
self.proc = await asyncio.create_subprocess_exec(
self.program,
*self.args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
async def stream(self):
while True:
if self.proc.stdout.at_eof() and self.proc.stderr.at_eof():
break
stdout = await self.proc.stdout.readline()
stderr = await self.proc.stderr.readline()
yield stdout.decode(), stderr.decode()
await asyncio.sleep(self.interval)
async def wait(self):
if self.proc is None:
return None
await self.proc.communicate()
return self.proc.returncode
async def main():
runner = Runner('sh', ['./never-ending-script.sh'])
await runner.start()
async for stdout, stderr in runner.stream():
if stdout:
print(f'[stdout] {stdout}', end='', flush=True)
if stderr:
print(f'[sdterr] {stderr}', end='', flush=True, file=sys.stderr)
returncode = await runner.wait()
print(f'return code is {returncode}')
if __name__ == '__main__':
asyncio.run(run('sh', ['./never-ending-script.sh']))
asyncio.run(run2('sh', ['./never-ending-script.sh']))
asyncio.run(main())
@eastonsuo
Copy link

should read stdout and stderr at the same time

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment