Skip to content

Instantly share code, notes, and snippets.

@kalebo
Last active March 13, 2024 20:53
Show Gist options
  • Save kalebo/1e085ee36de45ffded7e5d9f857265d0 to your computer and use it in GitHub Desktop.
Save kalebo/1e085ee36de45ffded7e5d9f857265d0 to your computer and use it in GitHub Desktop.
Emulate tee with python's async subprocess (>= 3.6)
class RunOutput():
def __init__(self, returncode, stdout, stderr):
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
# the following three commands are adapted from https://kevinmccarthy.org/2016/07/25/streaming-subprocess-stdin-and-stdout-with-asyncio-in-python/
async def _read_stream(stream, callback):
while True:
line = await stream.readline()
if line:
callback(line)
else:
break
async def _stream_subprocess(cmd, stdin=None, quiet=False, echo=False) -> RunOutput:
if isWindows():
platform_settings = {'env': os.environ}
else:
platform_settings = {'executable': '/bin/bash'}
if echo:
print(cmd)
p = await asyncio.create_subprocess_shell(cmd,
stdin=stdin,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
**platform_settings)
out = []
err = []
def tee(line, sink, pipe, label=""):
line = line.decode('utf-8').rstrip()
sink.append(line)
if not quiet:
print(label, line, file=pipe)
await asyncio.wait([
_read_stream(p.stdout, lambda l: tee(l, out, sys.stdout)),
_read_stream(p.stderr, lambda l: tee(l, err, sys.stderr, label="ERR:")),
])
return RunOutput(await p.wait(), out, err)
def rrun(cmd, stdin=None, quiet=False, echo=False) -> Result:
loop = asyncio.get_event_loop()
result = loop.run_until_complete(
_stream_subprocess(cmd, stdin=stdin, quiet=quiet, echo=echo)
)
# loop.close()
# Note that Error and Okay are from a result monad: see https://gist.github.com/kalebo/65642ac3d9326a6916bb1be7334cc353
if result.returncode != 0:
return Error(result)
else:
return Ok(result)
@LiraNuna
Copy link

LiraNuna commented Sep 5, 2022

@ssbarnea You should be able to safely swap asyncio.wait for asyncio.gather. In this instance they are both equal.
You will need to remove the wrapping list, though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment