Skip to content

Instantly share code, notes, and snippets.

@learnitall
Created January 6, 2024 21:55
Show Gist options
  • Save learnitall/2c86da0a05bb2810bd8b013f0b60bdcc to your computer and use it in GitHub Desktop.
Save learnitall/2c86da0a05bb2810bd8b013f0b60bdcc to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
status.py: Display status of resources in the terminal.
Requires rich and mctools. Developed on Python 3.10.
"""
from datetime import datetime
import time
from string import digits, printable
from mctools import PINGClient, RCONClient
import psutil
from pystemd.systemd1 import Unit
from rich.align import Align
from rich.console import Group
from rich.layout import Layout
from rich.live import Live
from rich.panel import Panel
from rich.progress import Progress, TextColumn, BarColumn
from rich.table import Table
from rich.text import Text
RENDER_INTERVAL_SECONDS = 1
MINECRAFT_HOST = "localhost"
MOON_PHASES = (
"🌕", "🌖", "🌗", "🌘", "🌑", "🌒", "🌓", "🌔",
)
HEADER_ART = """+------+.
|`. | `.
| `+--+---+
| | | | MCPaper
+---+--+. | Status
`. | `.| Display
`+------+"""
def percentage_to_color(percent: float) -> str:
"""
Return a color appropriate to the given percentage:
* percentage <= (1/3): green
* (1/3) <= percentage <= (2/3): yellow
* percentage > (2/3): red
"""
if percent <= 0.33:
return "green"
elif percent <= 0.66:
return "yellow"
else:
return "red"
def format_bytes(num, suffix="B") -> str:
"""
Format numeric bytes into a human-readable representation.
From: https://stackoverflow.com/questions/1094841/get-human-readable-version-of-file-size
"""
for unit in ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"):
if abs(num) < 1024.0:
return f"{num:3.1f}{unit}{suffix}"
num /= 1024.0
return f"{num:.1f}Yi{suffix}"
def only_digits(target: str) -> str:
return "".join((c for c in target if c in digits))
def only_ascii(target: str) -> str:
return "".join((c for c in target if c in printable))
def new_basic_progress_bar(
background_color: str = "bar.back",
progress_color: str = "bar.complete",
) -> Progress:
return Progress(
TextColumn(
"[bold][progress.description]{task.description}: [/bold]" \
"{task.completed:.2f}%",
),
BarColumn(
style=background_color,
complete_style=progress_color,
),
auto_refresh=False,
expand=True,
)
def render_header() -> Group:
"""
render_header returns a renderable of the McPaper header.
"""
return Group(
Text(HEADER_ART, justify="left", style="green bold"),
Text(datetime.now().ctime(), justify="center"),
)
def render_cpu_usage() -> Progress:
"""
render_cpu_usage returns a renderable of average CPU usage.
Assumes that at least 0.1 seconds have elapsed between calls.
"""
avg_time = psutil.cpu_percent()
progress = new_basic_progress_bar(
progress_color=percentage_to_color(avg_time / 100),
)
t1 = progress.add_task("Average CPU Time")
progress.update(t1, advance=(avg_time))
return progress
def render_mem_usage() -> Progress:
mem = psutil.virtual_memory()
free = (mem.available / mem.total)
progress = new_basic_progress_bar(
progress_color=percentage_to_color(1-free),
)
t1 = progress.add_task("Memory Free")
progress.update(t1, advance=free*100)
return progress
class NetworkUsageHandler:
"""
NetworkUsage is used to render network usage of each interface.
"""
OMIT = ("lo", "virbr0", "docker0")
def __init__(self):
self.prev_counters = None
self.prev_timestamp = None
@staticmethod
def bps_to_str(raw: float) -> str:
return f"{format_bytes(raw)} /s"
def render(self) -> Table:
t = Table(box=False, expand=True, pad_edge=False)
t.add_column("Netdev", justify="left", style="cyan")
for prefix in ("Rx", "Tx"):
t.add_column(
f"{prefix} Bytes", justify="right", style="green",
)
t.add_column(
f"{prefix} Drops", justify="right", style="red",
)
timestamp = datetime.now()
counters = psutil.net_io_counters(pernic=True)
if counters is None or counters == dict():
raise ValueError(
"Unable to grab inet counters, got empty result"
)
for iname in self.OMIT:
if counters.get(iname, None) is not None:
del(counters[iname])
if self.prev_counters is None or self.prev_timestamp is None:
self.prev_counters, self.prev_timestamp = counters, timestamp
return t
known_inames = self.prev_counters.keys()
delta_time = (timestamp - self.prev_timestamp).seconds
for iname in known_inames & counters.keys():
counts = counters[iname]
prev_counts = self.prev_counters[iname]
delta_rx_bytes = counts.bytes_recv - prev_counts.bytes_recv
rx_speed = delta_rx_bytes / delta_time
delta_tx_bytes = counts.bytes_sent - prev_counts.bytes_sent
tx_speed = delta_tx_bytes / delta_time
t.add_row(
iname,
self.bps_to_str(rx_speed), str(counts.dropout),
self.bps_to_str(tx_speed), str(counts.dropin),
)
self.prev_counters = counters
self.prev_timestamp = timestamp
return t
def render_cpu_temperature() -> Text:
"""
Render CPU temperature in celsius.
Assumes the system only has a single CPU.
"""
temps = psutil.sensors_temperatures()
if temps == dict():
return Text(
"OS does not support reading hardware temp", style="red",
)
if temps.get("coretemp") is None:
raise KeyError("Unable to find cpu temperature (coretemp)")
cputemp = None
for temp in temps["coretemp"]:
# Some systems have 'Physical id 0', others have
# 'Package id 0', so just check for something that ends
# with 'id 0'. Individual cores are formatted as "Core x".
if temp.label.endswith("id 0"):
cputemp = temp
break
if cputemp is None:
print(temps["coretemp"])
raise KeyError(
"Unable to find temperature for CPU (Physical id 0)"
)
t = Text(f"CPU Temperature: ")
t.stylize("bold")
temp = Text(f"{cputemp.current}°C")
high = cputemp.high if cputemp.high is not None else 100.0
if high - 10 <= cputemp.current:
color = "red"
elif high - 20 <= cputemp.current:
color = "yellow"
else:
color = "green"
temp.stylize(color)
t.append(temp)
return t
def render_systemd_unit(name: str) -> Text:
"""
Renders status information for a systemd unit by the given name.
"""
unit = Unit(name.encode("utf-8"), _autoload=True)
pid = unit.Service.MainPID
state = unit.Unit.ActiveState.decode("utf-8")
t = Text(f"{str(name)}: ")
t.stylize("bold")
serviceInfo = Text(f"{state.upper()} (PID {pid})")
serviceInfo.stylize(
{
"active": "green",
"failed": "red",
}.get(state, "yellow")
)
t.append(serviceInfo)
return t
def render_mc_server_info(
host: str, ping_port: int, rcon_port: int, password: str,
) -> Text:
"""
Render information about a Minecraft server.
Information is pulled using the Server List Ping interface and the
RCON interface.
See https://wiki.vg/Server_List_Ping and https://wiki.vg/RCON
for more information.
"""
t = Text(f"MC Server {host}: ")
t.stylize("bold")
pclient = PINGClient(host, port=ping_port)
try:
stats = pclient.get_stats(return_packet=False)
if not isinstance(stats, dict):
raise TypeError(
f"Unknown response from mc server {host}, expected dict: " \
f"{stats}",
)
except Exception as err:
t.append(f"{str(err)}")
t.stylize("red")
return t
else:
pclient.stop()
motd = stats.get("description")
# Remove non-ascii characters
if motd is not None:
t.append(f"{only_ascii(motd).strip('[0m')}")
num_online = stats.get("players", {}).get("online")
if num_online is not None:
t.append(Text(f"\nOnline Players: ", style="bold"))
t.append(f"{str(num_online)}")
latency = stats.get("time")
if latency is not None:
t.append(Text(f"\nPing: ", style="bold"))
t.append(f"{latency*100:.2f}ms")
version = stats.get("version", {}).get("name")
if version is not None:
t.append(Text(f"\nVersion: ", style="bold"))
t.append(f"{version}")
rcon = RCONClient(host, port=rcon_port)
success = rcon.login(password)
if not success:
t.append_text(
Text(
"\nWARN: Unable to authenticate to RCON",
style="yellow",
)
)
time_ticks = rcon.command("/time query daytime")
if not isinstance(time_ticks, str):
t.append_text(Text(
"\nUnknown response from rcon for '/time query daytime'" \
f": {time_ticks}", style="yellow",
))
elif len(time_ticks) > 0:
time_secs = int(only_digits(time_ticks)) / 20.0
time_str = time.strftime("%H:%M:%S", time.gmtime(time_secs))
t.append(Text(f"\nTime: ", style="bold"))
t.append(f"{time_str}")
n_days = rcon.command("/time query day")
if not isinstance(n_days, str):
t.append_text(Text(
"\nUnknown response from rcon for '/time query day'" \
f": {n_days}", style="yellow",
))
elif len(n_days) > 0:
phase = MOON_PHASES[int(only_digits(n_days)) % 8]
t.append(Text(f"\nMoon Phase: ", style="bold"))
t.append(f"{phase}")
rcon.stop()
return t
def render_all(network_usage_handler: NetworkUsageHandler) -> Align:
layout = Layout(name="root")
layout.split_row(
Layout(name="left"),
Layout(name="middle"),
Layout(name="right"),
)
layout["left"].split_row(
Layout(name="left-left"),
Layout(name="left-right"),
)
layout["left-left"].update(
Align.center(render_header(), vertical="bottom")
)
layout["left-right"].update(
Align.center(
Panel(
Group(
render_mc_server_info(
MINECRAFT_HOST, 13337, 25575,
"time-for-holiday-break-and-to-play-minecraft-8",
),
),
title="Minecraft Server Info",
),
vertical="bottom",
)
)
layout["middle"].update(
Align.center(
Panel(
Align.center(
Group(
render_systemd_unit("grafana.service"),
render_systemd_unit("prometheus.service"),
render_systemd_unit("minecraft-exporter.service"),
render_systemd_unit("prometheus-node-exporter.service"),
render_systemd_unit("minecraft-server.service"),
render_systemd_unit("caddy.service"),
),
vertical="bottom"
),
title="Systemd Units",
),
vertical="bottom"
)
)
layout["right"].update(
Align.center(
Panel(
Group(
render_cpu_usage(),
render_mem_usage(),
network_usage_handler.render(),
),
title="Node Info",
),
vertical="bottom",
)
)
return Align.center(layout, vertical="bottom")
def main():
network_usage_handler = NetworkUsageHandler()
do_render = lambda: render_all(network_usage_handler)
with Live(do_render()) as live:
while True:
try:
time.sleep(RENDER_INTERVAL_SECONDS)
live.update(do_render())
except KeyboardInterrupt:
return
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment