Skip to content

Instantly share code, notes, and snippets.

@bukzor
Last active January 28, 2024 18:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bukzor/62b8dc2e3de5d6e13dceb391aa0794eb to your computer and use it in GitHub Desktop.
Save bukzor/62b8dc2e3de5d6e13dceb391aa0794eb to your computer and use it in GitHub Desktop.
Define a 'cap1fd' pytest fixture which captures stdout/stderr combined.
"""Define a 'cap1fd' pytest fixture which captures stdout/stderr combined."""
from __future__ import annotations
import contextlib
from io import FileIO
# from typing import ContextManager
from typing import Generator
from typing import NamedTuple
from typing import Self
from _pytest.capture import CaptureBase
from _pytest.capture import CaptureFixture
from _pytest.capture import CaptureManager
from _pytest.fixtures import SubRequest
from _pytest.fixtures import fixture
FD = int
STDIN: FD = 0
STDOUT: FD = 1
STDERR: FD = 2
@contextlib.contextmanager
def redirect(from_: FD, to: FD) -> Generator[FD, None, None]:
"""Enable translating shell syntax `2>&1` to `redirect(2, 1)`."""
from os import close
from os import dup
from os import dup2
tmp = dup(from_)
dup2(to, from_)
try:
yield tmp
finally:
dup2(tmp, from_)
close(tmp)
class ActiveFDCapture(NamedTuple):
targetfd: FD
targetfd_save: FD
tmpfile: FileIO
redirect: contextlib.ExitStack
@classmethod
def open(cls, targetfd: FD) -> Self:
from tempfile import TemporaryFile
tmpfile: FileIO = TemporaryFile(buffering=0)
redirect_ = contextlib.ExitStack()
targetfd_save = redirect_.enter_context(
redirect(targetfd, tmpfile.fileno())
)
return cls(targetfd, targetfd_save, tmpfile, redirect_)
def close(self) -> None:
self.redirect.__exit__(None, None, None)
self.tmpfile.close()
def read(self) -> bytes:
self.tmpfile.seek(0)
result = self.tmpfile.read()
self.tmpfile.truncate(0)
return result
def writeorg(self, data: bytes) -> None:
"""Write to original file descriptor."""
import os
os.write(self.targetfd_save, data)
class CombinedCapture(ActiveFDCapture):
@classmethod
def open(cls, targetfd: FD, *other_fds: FD) -> Self:
self = super().open(targetfd)
for other_fd in other_fds:
self.redirect.enter_context(redirect(other_fd, targetfd))
return self
class NoopCapture(ActiveFDCapture):
@classmethod
def open(cls, targetfd: FD) -> Self:
from tempfile import TemporaryFile
tmpfile: FileIO = TemporaryFile(buffering=0)
redirect = contextlib.ExitStack()
return cls(targetfd, targetfd, tmpfile, redirect)
class ActiveStdioCapture(NamedTuple):
capture: ActiveFDCapture
@classmethod
def open(cls, targetfd: FD) -> Self:
if targetfd == 1:
return cls(CombinedCapture.open(targetfd, STDERR))
elif targetfd == 2:
return cls(NoopCapture.open(targetfd))
else:
raise ValueError(targetfd)
def close(self) -> None:
return self.capture.close()
def read(self) -> bytes:
return self.capture.read()
def writeorg(self, data: bytes) -> None:
return self.capture.writeorg(data)
class CombinedFDCapture(CaptureBase[str]):
"""Capture IO to/from a given OS-level file descriptor.
snap() produces text.
"""
EMPTY_BUFFER: str = ""
active: ActiveStdioCapture | None = None
# ABC: CaptureBase {
def __init__( # pyright: ignore[reportMissingSuperCall]
self, targetfd: int
) -> None:
self.targetfd = targetfd
def start(self) -> None:
"""Start capturing on targetfd using memorized tmpfile."""
assert self.active is None
self.active = ActiveStdioCapture.open(self.targetfd)
def done(self) -> None:
"""Stop capturing, restore streams, return original capture file,
seeked to position zero."""
assert self.active is not None
self.active.close()
self.active = None
def suspend(self) -> None:
return self.done()
def resume(self) -> None:
return self.start()
def writeorg(self, data: str) -> None:
"""Write to original file descriptor."""
assert self.active is not None
self.active.writeorg(data.encode())
def snap(self) -> str:
assert self.active is not None
return self.active.read().decode()
# } ABC: CaptureBase
def __repr__(self) -> str:
return "<{} {} active={}>".format(
self.__class__.__name__, self.targetfd, self.active
)
@fixture
def cap1fd(request: SubRequest) -> Generator[CaptureFixture[str], None, None]:
r"""Enable text capturing of writes to stdout and stderr, combined.
This enables accurate temporal interleaving of writes, at the cost of
losing the ability to distinguish between out and err. This is often
beneficical because:
1. it's how users and logs will see the output, by default
3. the temporal relationship between output and errors is highly
relevant to users and to tests
3. under test, combined longs are easier to understand and debug
``cap1fd.readouterr()`` will return combined output as `out` and
empty-string as `err`.
Returns an instance of :class:`CaptureFixture[str]`.
Example:
.. code-block:: python
def test_system_echo(cap1fd):
os.system('echo "hello"; echo ERROR >&2')
captured = cap1fd.readouterr()
assert captured.out == "hello\nERROR\n"
"""
capman = request.config.pluginmanager.getplugin("capturemanager")
assert isinstance(capman, CaptureManager), capman
capture_fixture = CaptureFixture(
CombinedFDCapture, request, _ispytest=True
)
try:
capman.set_fixture(capture_fixture)
capture_fixture._start() # pyright: ignore[reportPrivateUsage]
yield capture_fixture
finally:
capture_fixture.close()
capman.unset_fixture()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment