Skip to content

Instantly share code, notes, and snippets.

@Glutexo
Last active March 21, 2022 16:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Glutexo/71c2d84fbcbdcc0238b162652425d6f4 to your computer and use it in GitHub Desktop.
Save Glutexo/71c2d84fbcbdcc0238b162652425d6f4 to your computer and use it in GitHub Desktop.
Get a file from an Insights archive
from collections import namedtuple
from contextlib import contextmanager
from functools import partial
from io import BytesIO
from io import StringIO
from os.path import join
from re import escape
from re import fullmatch
from shlex import quote
from tarfile import open as tar_open
from tarfile import TarFile
from tarfile import TarInfo
from tempfile import TemporaryDirectory
from unittest.mock import patch
# Not from sys import, for the pytest capsys fixture to work.
# See https://github.com/pytest-dev/pytest/issues/8900
import sys
try:
from pytest import mark
from pytest import raises
except ImportError:
PYTEST = False
else:
PYTEST = True
# Reads Insights Client archive path from a --no-upload run and prints contents
# of a single file in the archive.
# Tested with Python 3.6.8, pytest-3.0.6.
# Usage:
# $ insights-client --no-upload | python get_file_from_insights_archive.py
# data/etc/ssh/sshd_config
# $ python get_file_from_insights_archive.py
# /var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz
# data/etc/ssh/sshd_config
# Caveats:
# Does not work with --verbose.
# Run tests:
# $ pytest get_file_from_insights_archive.py
# Example input:
# Starting to collect Insights data for localhost.localdomain
# Archive saved at /var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz
_ENCODING = "utf-8"
_TMP_PATH = "/var/tmp"
_PATH_PATTERN = (
escape(_TMP_PATH) +
r"/[^/]+/((insights-.+?|soscleaner)-\d{14,})\.tar\.gz"
)
_STDIN_PATTERN = r"Archive saved at (.+)\n"
_stdin_match = partial(fullmatch, _STDIN_PATTERN)
_ArchiveInfo = namedtuple("_ArchiveInfo", ("path", "name"))
def _find_archive_path_in_stdin():
for matched in filter(None, map(_stdin_match, sys.stdin)):
return matched[1]
else:
raise ValueError("Archive path not found.")
@contextmanager
def _open_archive(archive_path):
with tar_open(archive_path, "r:gz") as archive:
yield archive
def _member_path(archive_path, file_path):
match = fullmatch(_PATH_PATTERN, archive_path)
archive_name = match[1]
archive_prefix = match[2]
member_path = join(archive_name, file_path)
if archive_prefix != "soscleaner":
member_path = join(".", member_path)
return member_path
def _extract_member(archive, member_path):
try:
return archive.extractfile(member_path)
except KeyError:
raise LookupError("File not found in archive.")
def _print_to_stderr(*args, **kwargs):
print(*args, **kwargs, file=sys.stderr)
def _print_to_stdout(*args, **kwargs):
print(*args, **kwargs, file=sys.stdout)
def main(archive_path, file_path):
try:
archive_path = archive_path or _find_archive_path_in_stdin()
except ValueError as error:
_print_to_stderr(error)
return
_print_to_stderr("Found archive path %s." % (archive_path,))
with _open_archive(archive_path) as archive:
member_path = _member_path(archive_path, file_path)
try:
member = _extract_member(archive, member_path)
except LookupError as error:
_print_to_stderr(error)
return
_print_to_stderr("Found member %s." % (member_path,))
command_raw = [sys.executable, __file__, archive_path, file_path]
command_shell = " ".join(map(quote, command_raw))
_print_to_stderr(
"Re-run with the same archive and member with %s." % command_shell
)
for line_bytes in member:
line_str = line_bytes.decode(_ENCODING)
_print_to_stdout(line_str, end="")
# --------- Tests begin here ---------
if PYTEST:
@patch(
"%s.sys.stdin" % __name__,
StringIO("""# Starting to collect Insights data for localhost.localdomain
Archive saved at /var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz
""")
)
def test_find_archive_path_in_stdin_found():
actual = _find_archive_path_in_stdin()
expected = "/var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz"
assert actual == expected
@patch("%s.sys.stdin" % __name__, StringIO(""))
def test_find_archive_path_in_stdin_not_found():
with raises(ValueError) as exception_info:
_find_archive_path_in_stdin()
assert str(exception_info.value) == "Archive path not found."
def test_open_archive():
with TemporaryDirectory(dir=_TMP_PATH) as tmp_dir:
archive_path = join(
tmp_dir, "insights-localhost-20211026190220.tar.gz"
)
with TarFile.open(archive_path, mode="w:gz") as _tar:
pass
with _open_archive(archive_path) as archive:
assert archive.name == archive_path
assert archive.mode == "r"
def test_member_path():
archive_path =\
"/var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz"
file_path = "data/etc/ssh/sshd_config"
member_path = _member_path(archive_path, file_path)
assert member_path ==\
"./insights-localhost-20211026190220/data/etc/ssh/sshd_config"
def test_extract_member_found():
with TemporaryDirectory(dir=_TMP_PATH) as tmp_dir:
archive_name = "insights-localhost-20211026190220"
archive_path = join(tmp_dir, archive_name + ".tar.gz")
member_path = "./%s/data/etc/ssh/sshd_config" % archive_name
with TarFile.open(archive_path, mode="w:gz") as tar:
info = TarInfo(member_path)
tar.addfile(info)
with TarFile.open(archive_path, mode="r:gz") as tar:
member = _extract_member(tar, member_path)
assert member.read() == b""
def test_extract_member_not_found():
with TemporaryDirectory(dir=_TMP_PATH) as tmp_dir:
archive_name = "insights-localhost-20211026190220"
archive_path = join(tmp_dir, archive_name + ".tar.gz")
with TarFile.open(archive_path, mode="w:gz") as tar:
pass
with TarFile.open(archive_path, mode="r:gz") as tar:
member_path = "./%s/data/etc/ssh/sshd_config" % archive_name
with raises(LookupError) as exception_info:
_extract_member(tar, member_path)
assert str(exception_info.value) == "File not found in archive."
@patch("%s._extract_member" % __name__)
@patch("%s._member_path" % __name__)
@patch("%s._open_archive" % __name__)
@patch("%s._find_archive_path_in_stdin" % __name__)
def test_main_arg_calls(
find_archive_path_in_stdin,
open_archive,
member_path,
extract_member,
capsys
):
archive_path =\
"/var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz"
file_path = "data/etc/ssh/sshd_config"
main(archive_path, file_path)
find_archive_path_in_stdin.assert_not_called()
open_archive.assert_called_once_with(archive_path)
member_path.assert_called_once_with(archive_path, file_path)
extract_member.assert_called_once_with(
open_archive.return_value.__enter__.return_value,
member_path.return_value
)
@patch("%s._extract_member" % __name__)
@patch("%s._member_path" % __name__)
@patch("%s._open_archive" % __name__)
@patch(
"%s._find_archive_path_in_stdin" % __name__,
return_value=\
"/var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz"
)
def test_main_stdin_calls(
find_archive_path_in_stdin,
open_archive,
member_path,
extract_member,
capsys
):
file_path = "data/etc/ssh/sshd_config"
main(None, file_path)
find_archive_path_in_stdin.assert_called_once_with()
open_archive.assert_called_once_with(
find_archive_path_in_stdin.return_value
)
member_path.assert_called_once_with(
find_archive_path_in_stdin.return_value, file_path
)
extract_member.assert_called_once_with(
open_archive.return_value.__enter__.return_value,
member_path.return_value
)
@patch("%s._member_path" % __name__)
@patch("%s._open_archive" % __name__)
def test_main_output(_open_archive, _member_path, capsys):
expected_output = "HostKey /etc/ssh/ssh_host_rsa_key\n"
return_value = BytesIO(expected_output.encode(_ENCODING))
with patch(
"%s._extract_member" % __name__, return_value=return_value
) as __extract_member:
main(
"/var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz",
"data/etc/ssh/sshd_config"
)
out, _err = capsys.readouterr()
assert out == expected_output
@patch(
"%s._find_archive_path_in_stdin" % __name__,
side_effect=ValueError("not found")
)
def test_main_message_archive_not_found(
find_archive_path_in_stdin, capsys
):
main(None, "data/etc/ssh/sshd_config")
_out, err = capsys.readouterr()
err_lines = err.rstrip("\n").split("\n")
assert len(err_lines) == 1
assert err_lines[0] == "%s" % (find_archive_path_in_stdin.side_effect,)
@mark.parametrize(
("extract_member_config",),
[({},), ({"side_effect": LookupError("not found")},)]
)
@patch("%s._extract_member" % __name__)
@patch("%s._member_path" % __name__)
@patch("%s._open_archive" % __name__)
def test_main_message_archive_found(
_open_archive,
_member_path,
extract_member,
extract_member_config,
capsys
):
extract_member.configure_mock(**extract_member_config)
archive_path =\
"/var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz"
main(archive_path, "data/etc/ssh/sshd_config")
_out, err = capsys.readouterr()
err_lines = err.rstrip("\n").split("\n")
assert err_lines[0] == "Found archive path %s." % (archive_path,)
@patch("%s._extract_member" % __name__)
@patch(
"%s._member_path" % __name__,
return_value="./insights-localhost-timestamp/file/path"
)
@patch("%s._open_archive" % __name__)
def test_main_message_member_found(
_open_archive, member_path, _extract_member, capsys
):
main(
"/var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz",
"data/etc/ssh/sshd_config"
)
_out, err = capsys.readouterr()
err_lines = err.rstrip("\n").split("\n")
assert len(err_lines) == 3
assert err_lines[1] == "Found member %s." % (member_path.return_value,)
@patch("%s._extract_member" % __name__)
@patch(
"%s._member_path" % __name__,
return_value="./insights-localhost-timestamp/file/path"
)
@patch("%s._open_archive" % __name__)
def test_main_message_command(
_open_archive, _member_path, _extract_member, capsys
):
archive_path =\
"/var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz"
file_path = "data/etc/ssh/sshd_config"
main(archive_path, file_path)
_out, err = capsys.readouterr()
err_lines = err.rstrip("\n").split("\n")
assert len(err_lines) == 3
command_raw = [sys.executable, __file__, archive_path, file_path]
command_shell = " ".join(map(quote, command_raw))
assert err_lines[2] == (
"Re-run with the same archive and member with %s." %
(command_shell,)
)
if __name__ == "__main__":
if len(sys.argv) == 3:
archive_path_arg = sys.argv[1]
file_path_arg = sys.argv[2]
elif len(sys.argv) == 2:
archive_path_arg = None
file_path_arg = sys.argv[1]
else:
raise ValueError("Invalid argument count.")
main(archive_path_arg, file_path_arg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment