Last active
January 28, 2024 18:24
-
-
Save bukzor/62b8dc2e3de5d6e13dceb391aa0794eb to your computer and use it in GitHub Desktop.
Define a 'cap1fd' pytest fixture which captures stdout/stderr combined.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Define a 'cap1fd' pytest fixture which captures stdout/stderr combined.""" | |
from __future__ import annotations | |
import contextlib | |
from io import FileIO | |
# from typing import ContextManager | |
from typing import Generator | |
from typing import NamedTuple | |
from typing import Self | |
from _pytest.capture import CaptureBase | |
from _pytest.capture import CaptureFixture | |
from _pytest.capture import CaptureManager | |
from _pytest.fixtures import SubRequest | |
from _pytest.fixtures import fixture | |
FD = int | |
STDIN: FD = 0 | |
STDOUT: FD = 1 | |
STDERR: FD = 2 | |
@contextlib.contextmanager | |
def redirect(from_: FD, to: FD) -> Generator[FD, None, None]: | |
"""Enable translating shell syntax `2>&1` to `redirect(2, 1)`.""" | |
from os import close | |
from os import dup | |
from os import dup2 | |
tmp = dup(from_) | |
dup2(to, from_) | |
try: | |
yield tmp | |
finally: | |
dup2(tmp, from_) | |
close(tmp) | |
class ActiveFDCapture(NamedTuple): | |
targetfd: FD | |
targetfd_save: FD | |
tmpfile: FileIO | |
redirect: contextlib.ExitStack | |
@classmethod | |
def open(cls, targetfd: FD) -> Self: | |
from tempfile import TemporaryFile | |
tmpfile: FileIO = TemporaryFile(buffering=0) | |
redirect_ = contextlib.ExitStack() | |
targetfd_save = redirect_.enter_context( | |
redirect(targetfd, tmpfile.fileno()) | |
) | |
return cls(targetfd, targetfd_save, tmpfile, redirect_) | |
def close(self) -> None: | |
self.redirect.__exit__(None, None, None) | |
self.tmpfile.close() | |
def read(self) -> bytes: | |
self.tmpfile.seek(0) | |
result = self.tmpfile.read() | |
self.tmpfile.truncate(0) | |
return result | |
def writeorg(self, data: bytes) -> None: | |
"""Write to original file descriptor.""" | |
import os | |
os.write(self.targetfd_save, data) | |
class CombinedCapture(ActiveFDCapture): | |
@classmethod | |
def open(cls, targetfd: FD, *other_fds: FD) -> Self: | |
self = super().open(targetfd) | |
for other_fd in other_fds: | |
self.redirect.enter_context(redirect(other_fd, targetfd)) | |
return self | |
class NoopCapture(ActiveFDCapture): | |
@classmethod | |
def open(cls, targetfd: FD) -> Self: | |
from tempfile import TemporaryFile | |
tmpfile: FileIO = TemporaryFile(buffering=0) | |
redirect = contextlib.ExitStack() | |
return cls(targetfd, targetfd, tmpfile, redirect) | |
class ActiveStdioCapture(NamedTuple): | |
capture: ActiveFDCapture | |
@classmethod | |
def open(cls, targetfd: FD) -> Self: | |
if targetfd == 1: | |
return cls(CombinedCapture.open(targetfd, STDERR)) | |
elif targetfd == 2: | |
return cls(NoopCapture.open(targetfd)) | |
else: | |
raise ValueError(targetfd) | |
def close(self) -> None: | |
return self.capture.close() | |
def read(self) -> bytes: | |
return self.capture.read() | |
def writeorg(self, data: bytes) -> None: | |
return self.capture.writeorg(data) | |
class CombinedFDCapture(CaptureBase[str]): | |
"""Capture IO to/from a given OS-level file descriptor. | |
snap() produces text. | |
""" | |
EMPTY_BUFFER: str = "" | |
active: ActiveStdioCapture | None = None | |
# ABC: CaptureBase { | |
def __init__( # pyright: ignore[reportMissingSuperCall] | |
self, targetfd: int | |
) -> None: | |
self.targetfd = targetfd | |
def start(self) -> None: | |
"""Start capturing on targetfd using memorized tmpfile.""" | |
assert self.active is None | |
self.active = ActiveStdioCapture.open(self.targetfd) | |
def done(self) -> None: | |
"""Stop capturing, restore streams, return original capture file, | |
seeked to position zero.""" | |
assert self.active is not None | |
self.active.close() | |
self.active = None | |
def suspend(self) -> None: | |
return self.done() | |
def resume(self) -> None: | |
return self.start() | |
def writeorg(self, data: str) -> None: | |
"""Write to original file descriptor.""" | |
assert self.active is not None | |
self.active.writeorg(data.encode()) | |
def snap(self) -> str: | |
assert self.active is not None | |
return self.active.read().decode() | |
# } ABC: CaptureBase | |
def __repr__(self) -> str: | |
return "<{} {} active={}>".format( | |
self.__class__.__name__, self.targetfd, self.active | |
) | |
@fixture | |
def cap1fd(request: SubRequest) -> Generator[CaptureFixture[str], None, None]: | |
r"""Enable text capturing of writes to stdout and stderr, combined. | |
This enables accurate temporal interleaving of writes, at the cost of | |
losing the ability to distinguish between out and err. This is often | |
beneficical because: | |
1. it's how users and logs will see the output, by default | |
3. the temporal relationship between output and errors is highly | |
relevant to users and to tests | |
3. under test, combined longs are easier to understand and debug | |
``cap1fd.readouterr()`` will return combined output as `out` and | |
empty-string as `err`. | |
Returns an instance of :class:`CaptureFixture[str]`. | |
Example: | |
.. code-block:: python | |
def test_system_echo(cap1fd): | |
os.system('echo "hello"; echo ERROR >&2') | |
captured = cap1fd.readouterr() | |
assert captured.out == "hello\nERROR\n" | |
""" | |
capman = request.config.pluginmanager.getplugin("capturemanager") | |
assert isinstance(capman, CaptureManager), capman | |
capture_fixture = CaptureFixture( | |
CombinedFDCapture, request, _ispytest=True | |
) | |
try: | |
capman.set_fixture(capture_fixture) | |
capture_fixture._start() # pyright: ignore[reportPrivateUsage] | |
yield capture_fixture | |
finally: | |
capture_fixture.close() | |
capman.unset_fixture() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment