Skip to content

Instantly share code, notes, and snippets.

@ddelange
Last active May 10, 2024 16:28
Show Gist options
  • Save ddelange/6517e3267fb74eeee804e3b1490b1c1d to your computer and use it in GitHub Desktop.
Save ddelange/6517e3267fb74eeee804e3b1490b1c1d to your computer and use it in GitHub Desktop.
Handling the live output stream of a command
import logging
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen
def stream_command(
args,
*,
stdout_handler=logging.info,
stderr_handler=logging.error,
check=True,
text=True,
**kwargs,
):
"""Mimic subprocess.run, while processing the command output in real time."""
with Popen(args, text=text, stdout=PIPE, stderr=PIPE, **kwargs) as process:
with ThreadPoolExecutor(2) as pool: # two threads to handle the streams
exhaust = partial(pool.submit, partial(deque, maxlen=0))
exhaust(stdout_handler(line[:-1]) for line in process.stdout)
exhaust(stderr_handler(line[:-1]) for line in process.stderr)
retcode = process.poll()
if check and retcode:
raise CalledProcessError(retcode, process.args)
return CompletedProcess(process.args, retcode)
import logging
from subprocess import PIPE, STDOUT, CalledProcessError, CompletedProcess, Popen
def stream_command(
args,
*,
stdout_handler=logging.info,
check=True,
text=True,
**kwargs,
):
"""Mimic subprocess.run, while processing the command output in real time."""
with Popen(args, text=text, stdout=PIPE, stderr=STDOUT, **kwargs) as process:
for line in process.stdout:
stdout_handler(line[:-1])
retcode = process.poll()
if check and retcode:
raise CalledProcessError(retcode, process.args)
return CompletedProcess(process.args, retcode)
stream_command(["echo", "test"])
# INFO:root:test
stream_command("cat ./nonexist", shell=True, check=False)
# ERROR:root:cat: ./nonexist: No such file or directory
stream_command(["echo", "test"], stdout_handler=print)
# test
stdout_lines = []
def handler(line):
print(line)
logging.info(line)
stdout_lines.append(line)
stream_command(["echo", "test"], stdout_handler=handler)
# test
# INFO:root:test
print(stdout_lines)
# ['test']
# stream to log file
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s:%(levelname)-7s %(filename)20s:%(lineno)-4d %(name)s:%(message)s",
filename="./capture.log",
filemode="w",
encoding="utf-8",
)
logging.info("test from python")
stream_command(["echo", "test from subprocess"])
@ddelange
Copy link
Author

ddelange commented Jul 6, 2023

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment