Skip to content

Instantly share code, notes, and snippets.

@pirogoeth
Last active May 25, 2022 22:37
Show Gist options
  • Save pirogoeth/6168c93801b118774f765d8dc97b4782 to your computer and use it in GitHub Desktop.
Save pirogoeth/6168c93801b118774f765d8dc97b4782 to your computer and use it in GitHub Desktop.
Tail a file across many hosts
# -*- coding: utf-8 -*-
# from __future__ import annotations
import io
import logging
import os
import sys
from tempfile import mkstemp
from typing import List
import mitogen.core
import mitogen.master
import mitogen.select
import mitogen.utils
logger = logging.getLogger(__name__)
class Target:
""" Target host
"""
""" Name of the host to connect to. """
# hostname: str
hostname = None
""" Context used to call functions on the host. """
# context: mitogen.parent.Context
context = None
""" Receiver pipe the target delivers lines to. """
# receiver: mitogen.core.Receiver
receiver = None
# def __init__(self, hostname: str):
def __init__(self, hostname):
self.hostname = hostname
# def setup(self, router: mitogen.core.Router, sudo_as: Optional[str]=None, **kw):
def setup(self, router, sudo_as=None, **kw):
self.context = router.ssh(hostname=self.hostname, python_path=["/usr/bin/env", "python3"], **kw)
if sudo_as is not None:
self.context = router.sudo(username=sudo_as, via=self.context)
self.receiver = mitogen.core.Receiver(router)
self.receiver.target = self
# def stream_to(sender: mitogen.core.Sender, path: str, seek_offset: int = 0, seek_whence: str = "SEEK_END"):
def stream_to(sender, path, seek_offset = 0, seek_whence = "SEEK_END"):
""" Streams the file at path to sender. """
if not os.path.exists(path):
# logger.error(f"File {path} does not exist on {sender.receiver.target.hostname}")
logger.error("File {path} does not exist on {host}".format(path=path, host=sender.receiver.target.hostname))
elif not os.path.isfile(path):
# logger.error(f"Item at {path} is not a file on {sender.receiver.target.hostname}")
logger.error("Item at {path} is not a file on {host}".format(path=path, host=sender.receiver.target.hostname))
oswhence = {"SEEK_SET": io.SEEK_SET, "SEEK_CUR": io.SEEK_CUR, "SEEK_END": io.SEEK_END}[seek_whence]
# logger.debug(f"Opening {path} to read from {whence}")
logger.info("Opening {path} to read from {whence}".format(path=path, whence=seek_whence))
with io.open(path, mode="rb") as source:
source.seek(seek_offset, oswhence)
while source.readable():
line = source.readline()
if line != b"":
sender.send(line)
sender.close()
# def stream_from(selector: mitogen.select.Select):
def stream_from(selector):
""" Streams file data from a select to stdout. """
while True:
msg = selector.get()
line = msg.unpickle().rstrip().decode(encoding="utf-8", errors="ignore")
# print(f"{msg.receiver.target.hostname}: {line}")
print("{host}: {line}".format(host=msg.receiver.target.hostname, line=line))
def main():
import click
from halo import Halo
@click.command()
@click.option("-H", "--host", type=str, help="Hostname to tail on, can be specified multiple times", multiple=True)
@click.option("--hosts-stdin/--no-hosts-stdin", type=bool, help="Read a list of newline-delimited hosts from stdin")
@click.option("-p", "--path", type=str, help="Path to tail", required=True)
@click.option("--seek-offset", type=int, help="Byte offset to seek in the file", default=0)
@click.option("--seek-whence", type=click.Choice(["SEEK_SET", "SEEK_END"]), default="SEEK_END")
@click.option("--sudo-as", type=str, help="User to sudo to for reading")
@click.option("--debug/--no-debug", type=bool, help="Whether debug logging should be turned on", default=False)
# def main(host: List[str], hosts_stdin: bool, path: str, seek_offset: int, seek_whence: str, sudo_as: Optional[str]=None, debug: bool=False):
def _main(host, hosts_stdin, path, seek_offset, seek_whence, sudo_as=None, debug=False):
log_level = "DEBUG" if debug else "INFO"
log_level = logging._nameToLevel[log_level]
logging.basicConfig(level=log_level, stream=sys.stderr, format="[%(levelname)s] [%(asctime)s] %(message)s")
try:
broker = mitogen.master.Broker()
router = mitogen.master.Router(broker=broker)
connect_hosts = []
if hosts_stdin:
while not sys.stdin.closed:
line = sys.stdin.readline()
# if the line, BEFORE STRIPPING, is empty string, it's an EOF.
if line != "":
connect_hosts.append(line.strip())
else:
break
else:
connect_hosts = host
logger.debug("Connecting to hosts {hosts}".format(hosts=connect_hosts))
targets = []
with Halo(text="Connecting to hosts...", spinner="dots") as progress:
for _host in connect_hosts:
progress.text = "Connecting {host}".format(host=_host)
try:
target = Target(hostname=_host)
target.setup(router, sudo_as=sudo_as)
targets.append(target)
except Exception as err:
progress.fail(text=str(err))
logger.exception("Error while connecting to host {host}".format(host=_host))
progress.succeed(text="Connected to {count} hosts!".format(count=len(targets)))
# Attach a select to all target receivers
select = mitogen.select.Select(oneshot=False)
[select.add(target.receiver) for target in targets]
# Get all target hosts started streaming logs through the receiver.
calls = []
for target in targets:
logger.info("Start tailing on {host}".format(host=target.hostname))
sender = target.receiver.to_sender()
call = target.context.call_async(stream_to, sender, path, seek_offset=seek_offset, seek_whence=seek_whence)
call.target = target
calls.append(call)
try:
logger.info("Start streaming from children")
stream_from(select)
except KeyboardInterrupt:
pass
finally:
logger.info("Shutting down")
broker.shutdown()
broker.join()
_main()
[tool.poetry]
name = "multitail"
version = "0.1.0"
description = ""
authors = ["Sean Johnson <me@seanj.dev>"]
[tool.poetry.dependencies]
python = "^3.10"
mitogen = "^0.3.2"
click = "^8.1.3"
halo = "^0.0.31"
[tool.poetry.dev-dependencies]
flake8 = "^4.0.1"
black = "^22.3.0"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
multitail = "multitail:main"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment