Created
December 6, 2018 20:33
-
-
Save Gelbpunkt/3b9abea22ae5dd27f3047a5ce0bae953 to your computer and use it in GitHub Desktop.
Example for asyncio shell commands
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from asyncio.subprocess import PIPE, STDOUT | |
class Result: | |
def __init__(self, status, stdout, stderr): | |
self.status = status | |
self._stdout = stdout or "" | |
self._stderr = stderr or "" | |
if stdout is not None: | |
self.stdout = stdout.decode("utf-8") | |
else: | |
self.stdout = None | |
if stderr is not None: | |
self.stderr = stderr.decode("utf-8") | |
else: | |
self.stderr = None | |
def __repr__(self): | |
return f"<Result status={self.status} stdout={len(self._stdout)} stderr={len(self._stderr)}>" | |
async def run(shell_command): | |
p = await asyncio.create_subprocess_shell(shell_command, | |
stdin=PIPE, stdout=PIPE, stderr=STDOUT) | |
stdout, stderr = await p.communicate() | |
code = p.returncode | |
return Result(code, stdout, stderr) | |
if __name__ == "__main__": | |
async def test(): | |
r = await run("pwd") | |
print(r.stdout, r.stderr, r.status) | |
import sys | |
if sys.platform.startswith("win"): | |
loop = asyncio.ProactorEventLoop() # subprocess pipes only works with this under Win | |
asyncio.set_event_loop(loop) | |
else: | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(test()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment