Skip to content

Instantly share code, notes, and snippets.

@mherkazandjian
Last active May 11, 2022 22:53
Show Gist options
  • Save mherkazandjian/08aff4945589d0d8adbc0c82eaef92c1 to your computer and use it in GitHub Desktop.
Save mherkazandjian/08aff4945589d0d8adbc0c82eaef92c1 to your computer and use it in GitHub Desktop.
async capture of stdout while another coroutine is running and doing stuff
import asyncio
import shlex
async def run_command(command: str):
"""
Coroutine that runs a command and captures the output as it is generated
The stdout output is collected in an async fashion one line at a time.
Each line is logged to the output for demonstration purposes. Once can
do other kind of processing to the read line by passing a callback to this
function (need to modify the signature to accept a callable though. In
contrast to subprocess.Popen, no threading was used to achieve this.
:param command: The command to be executed
:return: the collected output as a list of lines
"""
process_coro = asyncio.create_subprocess_exec(
*shlex.split(command),
stdout=asyncio.subprocess.PIPE,
)
process = await process_coro
stdout_buffer = []
while True:
stdout_line = await process.stdout.readline()
# process the output one line at a time
print('coroutine subprocess : ' + stdout_line.decode().strip())
if not stdout_line:
break
stdout_buffer.append(stdout_line)
await process.wait()
return stdout_buffer
async def do_something():
"""coroutine that writes a string to stdout and gets suspended a bit"""
dt_sleep = 5
while True:
print('coroutine do_something: sleep for {} seconds'.format(dt_sleep))
await asyncio.sleep(dt_sleep)
return True
async def main():
tasks = [
asyncio.create_task(do_something()),
asyncio.create_task(run_command('ping localhost'))
]
results = await asyncio.gather(*tasks)
return results
event_loop = asyncio.new_event_loop()
event_loop.run_until_complete(main())
event_loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment