Skip to content

Instantly share code, notes, and snippets.

@kalebo
Last active March 13, 2024 20:53
Show Gist options
  • Save kalebo/1e085ee36de45ffded7e5d9f857265d0 to your computer and use it in GitHub Desktop.
Save kalebo/1e085ee36de45ffded7e5d9f857265d0 to your computer and use it in GitHub Desktop.
Emulate tee with python's async subprocess (>= 3.6)
class RunOutput():
def __init__(self, returncode, stdout, stderr):
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
# the following three commands are adapted from https://kevinmccarthy.org/2016/07/25/streaming-subprocess-stdin-and-stdout-with-asyncio-in-python/
async def _read_stream(stream, callback):
while True:
line = await stream.readline()
if line:
callback(line)
else:
break
async def _stream_subprocess(cmd, stdin=None, quiet=False, echo=False) -> RunOutput:
if isWindows():
platform_settings = {'env': os.environ}
else:
platform_settings = {'executable': '/bin/bash'}
if echo:
print(cmd)
p = await asyncio.create_subprocess_shell(cmd,
stdin=stdin,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
**platform_settings)
out = []
err = []
def tee(line, sink, pipe, label=""):
line = line.decode('utf-8').rstrip()
sink.append(line)
if not quiet:
print(label, line, file=pipe)
await asyncio.wait([
_read_stream(p.stdout, lambda l: tee(l, out, sys.stdout)),
_read_stream(p.stderr, lambda l: tee(l, err, sys.stderr, label="ERR:")),
])
return RunOutput(await p.wait(), out, err)
def rrun(cmd, stdin=None, quiet=False, echo=False) -> Result:
loop = asyncio.get_event_loop()
result = loop.run_until_complete(
_stream_subprocess(cmd, stdin=stdin, quiet=quiet, echo=echo)
)
# loop.close()
# Note that Error and Okay are from a result monad: see https://gist.github.com/kalebo/65642ac3d9326a6916bb1be7334cc353
if result.returncode != 0:
return Error(result)
else:
return Ok(result)
@ssbarnea
Copy link

Sadly this example is outdated as it raises a runtime warning with python 3.9:

DeprecationWarning: The explicit passing of coroutine objects to asyncio.wait() is deprecated since Python 3.8, and scheduled for removal in Python 3.11.

I am quite interested in a good tee implementation, one that has a good test matrix.

@LiraNuna
Copy link

LiraNuna commented Sep 5, 2022

@ssbarnea You should be able to safely swap asyncio.wait for asyncio.gather. In this instance they are both equal.
You will need to remove the wrapping list, though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment