Skip to content

Instantly share code, notes, and snippets.

@thsutton
Last active December 15, 2020 11:44
Show Gist options
  • Save thsutton/822a2073d120eae9d3f35a378d3373b8 to your computer and use it in GitHub Desktop.
Save thsutton/822a2073d120eae9d3f35a378d3373b8 to your computer and use it in GitHub Desktop.
Using asyncio to handle subprocess output line-by-line.
#!/bin/bash
fmt="$1"
n=0
while [[ n -lt 5 ]]; do
n=$((n + 1))
date "+o: $fmt"
echo -n "antici"
sleep 1
date "+e: $fmt" > /dev/stderr
sleep 1
echo "pation"
done
echo -n "wot" > /dev/stderr
sleep 2
exec 2<&-
n=0
while [[ n -lt 10 ]]; do
n=$((n + 1))
date "+o: $fmt"
done
#!/usr/bin/env python3
import asyncio
import logging
async def queue_messages(stream, queue, name):
while True:
l = await stream.readline()
if l:
s = l.decode().strip()
logging.info(f"Queuing message: {s}")
await queue.put(f"{name}: {s}")
else:
await queue.put(f"Closing {name}")
break
async def report_status(proc, queue):
r = await proc.wait()
await queue.put(f"Finished with status {r}")
async def run(s, q):
proc = await asyncio.create_subprocess_exec(
"./demo.sh", s,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
return await asyncio.gather(
queue_messages(proc.stderr, q, "stderr"),
queue_messages(proc.stdout, q, "stdin"),
report_status(proc, q)
)
async def log(q):
while True:
s = await q.get()
logging.warning(s)
async def main():
q = asyncio.Queue(maxsize=10)
t = asyncio.create_task(log(q), name="Logger")
await run('%Y%m%d-%H%M%S', q)
t.cancel()
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment